Repository: kafka Updated Branches: refs/heads/trunk fd8eb268d -> 049342e44
KAFKA-3073: Add topic regex support for Connect sinks There are more methods that had to be touched than I anticipated when writing [the KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-215%3A+Add+topic+regex+support+for+Connect+sinks). The implementation here is now complete and includes a test that verifies that there's a call to `consumer.subscribe(Pattern, RebalanceHandler)` when `topics.regex` is provided. Author: Jeff Klukas <[email protected]> Reviewers: Randall Hauch <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #4151 from jklukas/connect-topics.regex Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/049342e4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/049342e4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/049342e4 Branch: refs/heads/trunk Commit: 049342e440a5ca045771f3eb5b4c72d3e52ffac6 Parents: fd8eb26 Author: Jeff Klukas <[email protected]> Authored: Tue Nov 21 16:01:16 2017 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Nov 21 16:01:16 2017 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/connect/sink/SinkTask.java | 8 +++++ .../connect/runtime/SinkConnectorConfig.java | 16 +++++++-- .../apache/kafka/connect/runtime/Worker.java | 14 +++++--- .../kafka/connect/runtime/WorkerSinkTask.java | 32 ++++++++++++++--- .../runtime/distributed/DistributedHerder.java | 5 +-- .../runtime/standalone/StandaloneHerder.java | 16 +++------ .../connect/runtime/WorkerSinkTaskTest.java | 37 ++++++++++++++++++++ .../kafka/connect/runtime/WorkerTest.java | 5 ++- .../distributed/DistributedHerderTest.java | 37 +++++++++++--------- .../standalone/StandaloneHerderTest.java | 9 +++-- docs/connect.html | 5 +-- 11 files changed, 134 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java index 8abff47..1406b30 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -61,6 +61,14 @@ public abstract class SinkTask implements Task { */ public static final String TOPICS_CONFIG = "topics"; + /** + * <p> + * The configuration key that provides a regex specifying which topics to include as inputs + * for this SinkTask. + * </p> + */ + public static final String TOPICS_REGEX_CONFIG = "topics.regex"; + protected SinkTaskContext context; /** http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index e47d537..cf5564c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.transforms.util.RegexValidator; import java.util.Map; @@ -27,13 +29,21 @@ import java.util.Map; public class SinkConnectorConfig extends ConnectorConfig { - public static final String TOPICS_CONFIG = "topics"; - private static final String TOPICS_DOC = ""; + public static final String TOPICS_CONFIG = SinkTask.TOPICS_CONFIG; + private static final String TOPICS_DOC = "List of topics to consume, separated by commas"; public static final String TOPICS_DEFAULT = ""; private static final String TOPICS_DISPLAY = "Topics"; + private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG; + private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " + + "Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " + + "Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified."; + public static final String TOPICS_REGEX_DEFAULT = ""; + private static final String TOPICS_REGEX_DISPLAY = "Topics regex"; + static ConfigDef config = ConnectorConfig.configDef() - .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY); + .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) + .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY); public static ConfigDef configDef() { return config; http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index c6e2e17..992825c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -254,17 +254,18 @@ public class Worker { * Get a list of updated task properties for the tasks of this connector. * * @param connName the connector name. - * @param maxTasks the maxinum number of tasks. - * @param sinkTopics a list of sink topics. * @return a list of updated tasks properties. */ - public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) { + public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) { log.trace("Reconfiguring connector tasks for {}", connName); WorkerConnector workerConnector = connectors.get(connName); if (workerConnector == null) throw new ConnectException("Connector " + connName + " not found in this worker."); + int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); + Map<String, String> connOriginals = connConfig.originalsStrings(); + Connector connector = workerConnector.connector(); List<Map<String, String>> result = new ArrayList<>(); ClassLoader savedLoader = plugins.currentThreadLoader(); @@ -275,8 +276,11 @@ public class Worker { // Ensure we don't modify the connector's copy of the config Map<String, String> taskConfig = new HashMap<>(taskProps); taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); - if (sinkTopics != null) { - taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); + if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) { + taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG)); + } + if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) { + taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG)); } result.add(taskConfig); } http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 234ce8a..05ace58 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -53,6 +54,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import static java.util.Collections.singleton; @@ -258,11 +260,31 @@ class WorkerSinkTask extends WorkerTask { */ protected void initializeAndStart() { String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG); - if (topicsStr == null || topicsStr.isEmpty()) - throw new ConnectException("Sink tasks require a list of topics."); - String[] topics = topicsStr.split(","); - consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); - log.debug("{} Initializing and starting task for topics {}", this, topics); + boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty(); + + String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG); + boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty(); + + if (topicsStrPresent && topicsRegexStrPresent) { + throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG + + " are mutually exclusive options, but both are set."); + } + + if (!topicsStrPresent && !topicsRegexStrPresent) { + throw new ConfigException("Must configure one of " + + SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); + } + + if (topicsStrPresent) { + String[] topics = topicsStr.split(","); + consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); + log.debug("{} Initializing and starting task for topics {}", this, topics); + } else { + Pattern pattern = Pattern.compile(topicsRegexStr); + consumer.subscribe(pattern, new HandleRebalance()); + log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr); + } + task.initialize(context); task.start(taskConfig); log.info("{} Sink task finished initialization and start", this); http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 79d32da..a1cc56a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -974,16 +974,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable { Map<String, String> configs = configState.connectorConfig(connName); ConnectorConfig connConfig; - List<String> sinkTopics = null; if (worker.isSinkConnector(connName)) { connConfig = new SinkConnectorConfig(plugins(), configs); - sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG); } else { connConfig = new SourceConnectorConfig(plugins(), configs); } - final List<Map<String, String>> taskProps - = worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics); + final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig); boolean changed = false; int currentNumTasks = configState.taskCount(connName); if (taskProps.size() != currentNumTasks) { http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 5d8beab..e9ec0f9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -255,19 +255,11 @@ public class StandaloneHerder extends AbstractHerder { private List<Map<String, String>> recomputeTaskConfigs(String connName) { Map<String, String> config = configState.connectorConfig(connName); - ConnectorConfig connConfig; - if (worker.isSinkConnector(connName)) { - connConfig = new SinkConnectorConfig(plugins(), config); - return worker.connectorTaskConfigs(connName, - connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), - connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG)); - } else { - connConfig = new SourceConnectorConfig(plugins(), config); - return worker.connectorTaskConfigs(connName, - connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), - null); - } + ConnectorConfig connConfig = worker.isSinkConnector(connName) ? + new SinkConnectorConfig(plugins(), config) : + new SourceConnectorConfig(plugins(), config); + return worker.connectorTaskConfigs(connName, connConfig); } private void createConnectorTasks(String connName, TargetState initialState) { http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 50b091d..48d8740 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -67,6 +67,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; import static java.util.Arrays.asList; import static java.util.Collections.singleton; @@ -127,6 +128,7 @@ public class WorkerSinkTaskTest { @Mock private KafkaConsumer<byte[], byte[]> consumer; private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); + private Capture<Pattern> topicsRegex = EasyMock.newCapture(); private long recordsReturnedTp1; private long recordsReturnedTp3; @@ -1143,6 +1145,41 @@ public class WorkerSinkTaskTest { PowerMock.verifyAll(); } + @Test + public void testTopicsRegex() throws Exception { + Map<String, String> props = new HashMap<>(TASK_PROPS); + props.remove("topics"); + props.put("topics.regex", "te.*"); + TaskConfig taskConfig = new TaskConfig(props); + + createTask(TargetState.PAUSED); + + PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); + consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener)); + PowerMock.expectLastCall(); + + sinkTask.initialize(EasyMock.capture(sinkTaskContext)); + PowerMock.expectLastCall(); + sinkTask.start(props); + PowerMock.expectLastCall(); + + expectPollInitialAssignment(); + + Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2)); + EasyMock.expect(consumer.assignment()).andReturn(partitions); + consumer.pause(partitions); + PowerMock.expectLastCall(); + + PowerMock.replayAll(); + + workerTask.initialize(taskConfig); + workerTask.initializeAndStart(); + workerTask.iteration(); + time.sleep(10000L); + + PowerMock.verifyAll(); + } + private void expectInitializeTask() throws Exception { PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener)); http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 80c65df..e78ccc8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -443,7 +443,10 @@ public class WorkerTest extends ThreadedTest { } catch (ConnectException e) { // expected } - List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar")); + Map<String, String> connProps = new HashMap<>(props); + connProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "2"); + ConnectorConfig connConfig = new SinkConnectorConfig(plugins, connProps); + List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, connConfig); Map<String, String> expectedTaskProps = new HashMap<>(); expectedTaskProps.put("foo", "bar"); expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 7483261..d41ccbe 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -168,12 +168,15 @@ public class DistributedHerderTest { private ConfigBackingStore.UpdateListener configUpdateListener; private WorkerRebalanceListener rebalanceListener; + private SinkConnectorConfig conn1SinkConfig; + private SinkConnectorConfig conn1SinkConfigUpdated; + @Before public void setUp() throws Exception { time = new MockTime(); metrics = new MockConnectMetrics(time); worker = PowerMock.createMock(Worker.class); - EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE); + EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE); herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"}, new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time); @@ -181,6 +184,8 @@ public class DistributedHerderTest { configUpdateListener = herder.new ConfigUpdateListener(); rebalanceListener = herder.new RebalanceListener(); plugins = PowerMock.createMock(Plugins.class); + conn1SinkConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG); + conn1SinkConfigUpdated = new SinkConnectorConfig(plugins, CONN1_CONFIG_UPDATED); EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes(); pluginLoader = PowerMock.createMock(PluginClassLoader.class); delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); @@ -205,7 +210,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -232,7 +237,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -248,7 +253,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -279,7 +284,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -558,7 +563,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); // And delete the connector member.wakeup(); @@ -584,7 +589,7 @@ public class DistributedHerderTest { @Test public void testRestartConnector() throws Exception { - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS); // get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("leader"); @@ -740,7 +745,7 @@ public class DistributedHerderTest { @Test public void testRestartTask() throws Exception { - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS); // get the initial assignment EasyMock.expect(member.memberId()).andStubReturn("leader"); @@ -921,7 +926,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -950,7 +955,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -966,7 +971,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -994,7 +999,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -1047,7 +1052,7 @@ public class DistributedHerderTest { // we expect reconfiguration after resuming EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); worker.setTargetState(CONN1, TargetState.STARTED); PowerMock.expectLastCall(); @@ -1241,7 +1246,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.getPlugins()).andReturn(plugins); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); @@ -1317,7 +1322,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS); // list connectors, get connector info, get connector config, get task configs member.wakeup(); @@ -1356,7 +1361,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); - EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); + EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfigUpdated)).andReturn(TASK_CONFIGS); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 1cd1804..18d2739 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -30,6 +30,8 @@ import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.SinkConnectorConfig; +import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskStatus; @@ -479,7 +481,7 @@ public class StandaloneHerderTest { EasyMock.expectLastCall().andReturn(true); EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); // Generate same task config, which should result in no additional action to restart tasks - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null)) + EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig))) .andReturn(singletonList(taskConfig(SourceSink.SOURCE))); worker.isSinkConnector(CONNECTOR_NAME); EasyMock.expectLastCall().andReturn(false); @@ -562,6 +564,9 @@ public class StandaloneHerderTest { private void expectAdd(SourceSink sourceSink) throws Exception { Map<String, String> connectorProps = connectorConfig(sourceSink); + ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ? + new SourceConnectorConfig(plugins, connectorProps) : + new SinkConnectorConfig(plugins, connectorProps); worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); @@ -577,7 +582,7 @@ public class StandaloneHerderTest { Map<String, String> generatedTaskProps = taskConfig(sourceSink); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null)) + EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) .andReturn(singletonList(generatedTaskProps)); worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); http://git-wip-us.apache.org/repos/asf/kafka/blob/049342e4/docs/connect.html ---------------------------------------------------------------------- diff --git a/docs/connect.html b/docs/connect.html index 78c66b1..b910cf5 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -95,9 +95,10 @@ <p>The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.</p> - <p>Sink connectors also have one additional option to control their input:</p> + <p>Sink connectors also have a few additional options to control their input. Each sink connector must set one of the following:</p> <ul> - <li><code>topics</code> - A list of topics to use as input for this connector</li> + <li><code>topics</code> - A comma-separated list of topics to use as input for this connector</li> + <li><code>topics.regex</code> - A Java regular expression of topics to use as input for this connector</li> </ul> <p>For any other options, you should consult the documentation for the connector.</p>
