KAFKA-3459: Returning zero task configurations from a connector does not properly clean up existing tasks
hachikuji ewencp Can you take a look when you have time? Author: Liquan Pei <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1248 from Ishiihara/kafka-3459 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0dedc63 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0dedc63 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0dedc63 Branch: refs/heads/0.10.0 Commit: d0dedc6314bfd83d9b2b9a9557e3168e715981da Parents: 096b8b8 Author: Liquan Pei <[email protected]> Authored: Fri Apr 29 14:49:22 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Apr 29 14:49:22 2016 -0700 ---------------------------------------------------------------------- .../runtime/distributed/ClusterConfigState.java | 13 +- .../runtime/distributed/DistributedHerder.java | 15 +-- .../runtime/standalone/StandaloneHerder.java | 16 +-- .../connect/storage/ConfigBackingStore.java | 5 +- .../storage/KafkaConfigBackingStore.java | 82 +++++------- .../storage/MemoryConfigBackingStore.java | 18 ++- .../storage/KafkaConfigBackingStoreTest.java | 131 ++++++++++++++++--- 7 files changed, 174 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java index c5c217e..ea5ba82 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java @@ -22,10 +22,11 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; /** * An immutable snapshot of the configuration state of connectors and tasks in a Kafka Connect cluster. @@ -116,15 +117,15 @@ public class ClusterConfigState { /** * Get all task configs for a connector. * @param connector name of the connector - * @return a map from the task id to its configuration + * @return a list of task configurations */ - public Map<ConnectorTaskId, Map<String, String>> allTaskConfigs(String connector) { - Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); + public List<Map<String, String>> allTaskConfigs(String connector) { + Map<Integer, Map<String, String>> taskConfigs = new TreeMap<>(); for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : this.taskConfigs.entrySet()) { if (taskConfigEntry.getKey().connector().equals(connector)) - taskConfigs.put(taskConfigEntry.getKey(), taskConfigEntry.getValue()); + taskConfigs.put(taskConfigEntry.getKey().task(), taskConfigEntry.getValue()); } - return taskConfigs; + return new LinkedList<>(taskConfigs.values()); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index cbef186..037eba7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -543,7 +542,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { else if (!configState.contains(connName)) callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); else { - configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, configs)); + configBackingStore.putTaskConfigs(connName, configs); callback.onCompletion(null, null); } return null; @@ -853,7 +852,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } if (changed) { if (isLeader()) { - configBackingStore.putTaskConfigs(connName, taskConfigListAsMap(connName, taskProps)); + configBackingStore.putTaskConfigs(connName, taskProps); cb.onCompletion(null, null); } else { // We cannot forward the request on the same thread because this reconfiguration can happen as a result of connector @@ -1064,14 +1063,4 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connName, List<Map<String, String>> configs) { - int index = 0; - Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>(); - for (Map<String, String> taskConfigMap : configs) { - ConnectorTaskId taskId = new ConnectorTaskId(connName, index); - result.put(taskId, taskConfigMap); - index++; - } - return result; - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index ad02e99..2316bae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -250,20 +249,13 @@ public class StandaloneHerder extends AbstractHerder { return connName; } - private Map<ConnectorTaskId, Map<String, String>> recomputeTaskConfigs(String connName) { + private List<Map<String, String>> recomputeTaskConfigs(String connName) { Map<String, String> config = configState.connectorConfig(connName); ConnectorConfig connConfig = new ConnectorConfig(config); - List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(connName, + return worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), connConfig.getList(ConnectorConfig.TOPICS_CONFIG)); - - int i = 0; - Map<ConnectorTaskId, Map<String, String>> taskConfigMap = new HashMap<>(); - for (Map<String, String> taskConfig : taskConfigs) - taskConfigMap.put(new ConnectorTaskId(connName, i++), taskConfig); - - return taskConfigMap; } private void createConnectorTasks(String connName, TargetState initialState) { @@ -296,8 +288,8 @@ public class StandaloneHerder extends AbstractHerder { return; } - Map<ConnectorTaskId, Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName); - Map<ConnectorTaskId, Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName); + List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName); + List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName); if (!newTaskConfigs.equals(oldTaskConfigs)) { removeConnectorTasks(connName); http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java index 5244842..77fc43b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,9 +65,9 @@ public interface ConfigBackingStore { /** * Update the task configurations for a connector. * @param connector name of the connector - * @param configs the new task configs + * @param configs the new task configs for the connector */ - void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs); + void putTaskConfigs(String connector, List<Map<String, String>> configs); /** * Remove the task configs associated with a connector. http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 8d20288..9412e42 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -205,6 +205,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { // Connector and task configs: name or id -> config map private Map<String, Map<String, String>> connectorConfigs = new HashMap<>(); private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); + // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. private Set<String> inconsistent = new HashSet<>(); @@ -339,12 +340,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates * that we would be leaving one of the referenced connectors with an inconsistent state. * - * @param configs map containing task configurations + * @param connector the connector to write task configuration + * @param configs list of task configurations for the connector * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root * and task configurations. */ @Override - public void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) { + public void putTaskConfigs(String connector, List<Map<String, String>> configs) { // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have // any outstanding lagging data to consume. try { @@ -354,46 +356,33 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { throw new ConnectException("Error writing root configuration to Kafka", e); } - // In theory, there is only a single writer and we shouldn't need this lock since the background thread should - // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to - // the root config being updated. - Map<String, Integer> newTaskCounts = new HashMap<>(); - synchronized (lock) { - // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks - // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here - Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs); - for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) { - if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) { - log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); - throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors"); - } - newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size()); - } - } + int taskCount = configs.size(); // Start sending all the individual updates - for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) { + int index = 0; + for (Map<String, String> taskConfig: configs) { Struct connectConfig = new Struct(TASK_CONFIGURATION_V0); - connectConfig.put("properties", taskConfigEntry.getValue()); + connectConfig.put("properties", taskConfig); byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig); - log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration: " + taskConfigEntry.getValue()); - configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig); + log.debug("Writing configuration for task " + index + " configuration: " + taskConfig); + ConnectorTaskId connectorTaskId = new ConnectorTaskId(connector, index); + configLog.send(TASK_KEY(connectorTaskId), serializedConfig); + index++; } // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to // the end of the log try { // Read to end to ensure all the task configs have been written - configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); - - // Write all the commit messages - for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) { - Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); - connectConfig.put("tasks", taskCountEntry.getValue()); - byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig); - log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with " + taskCountEntry.getValue() + " tasks."); - configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig); + if (taskCount > 0) { + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); } + // Write the commit message + Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); + connectConfig.put("tasks", taskCount); + byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig); + log.debug("Writing commit for connector " + connector + " with " + taskCount + " tasks."); + configLog.send(COMMIT_TASKS_KEY(connector), serializedConfig); // Read to end to ensure all the commit messages have been written configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); @@ -426,6 +415,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); } + @SuppressWarnings("unchecked") private class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> { @Override public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) { @@ -562,20 +552,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value()); return; } - Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName); int newTaskCount = intValue(((Map<String, Object>) value.value()).get("tasks")); // Validate the configs we're supposed to update to ensure we're getting a complete configuration // update of all tasks that are expected based on the number of tasks in the commit message. - Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred); - Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName); - if (taskIdSet == null) { - //TODO: Figure out why this happens (KAFKA-3321) - log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen."); - return; - } + Set<Integer> taskIdSet = taskIds(connectorName, deferred); if (!completeTaskIdSet(taskIdSet, newTaskCount)) { // Given the logic for writing commit messages, we should only hit this condition due to compacted // historical data, in which case we would not have applied any updates yet and there will be no @@ -622,19 +605,18 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { } /** - * Given task configurations, get a set of integer task IDs organized by connector name. + * Given task configurations, get a set of integer task IDs for the connector. */ - private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) { - Map<String, Set<Integer>> connectorTaskIds = new HashMap<>(); - if (configs == null) - return connectorTaskIds; - for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) { - ConnectorTaskId taskId = taskConfigEntry.getKey(); - if (!connectorTaskIds.containsKey(taskId.connector())) - connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>()); - connectorTaskIds.get(taskId.connector()).add(taskId.task()); + private Set<Integer> taskIds(String connector, Map<ConnectorTaskId, Map<String, String>> configs) { + Set<Integer> tasks = new TreeSet<>(); + if (configs == null) { + return tasks; + } + for (ConnectorTaskId taskId : configs.keySet()) { + assert taskId.connector().equals(connector); + tasks.add(taskId.task()); } - return connectorTaskIds; + return tasks; } private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index ec5f2e6..212022d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -24,7 +24,9 @@ import org.apache.kafka.connect.util.ConnectorTaskId; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.TimeUnit; public class MemoryConfigBackingStore implements ConfigBackingStore { @@ -108,15 +110,16 @@ public class MemoryConfigBackingStore implements ConfigBackingStore { } @Override - public synchronized void putTaskConfigs(String connector, Map<ConnectorTaskId, Map<String, String>> configs) { + public synchronized void putTaskConfigs(String connector, List<Map<String, String>> configs) { ConnectorState state = connectors.get(connector); if (state == null) throw new IllegalArgumentException("Cannot put tasks for non-existing connector"); - state.taskConfigs = configs; + Map<ConnectorTaskId, Map<String, String>> taskConfigsMap = taskConfigListAsMap(connector, configs); + state.taskConfigs = taskConfigsMap; if (updateListener != null) - updateListener.onTaskConfigUpdate(configs.keySet()); + updateListener.onTaskConfigUpdate(taskConfigsMap.keySet()); } @Override @@ -151,4 +154,13 @@ public class MemoryConfigBackingStore implements ConfigBackingStore { this.taskConfigs = new HashMap<>(); } } + + private static Map<ConnectorTaskId, Map<String, String>> taskConfigListAsMap(String connector, List<Map<String, String>> configs) { + int index = 0; + Map<ConnectorTaskId, Map<String, String>> result = new TreeMap<>(); + for (Map<String, String> taskConfigMap: configs) { + result.put(new ConnectorTaskId(connector, index++), taskConfigMap); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d0dedc63/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index eaad34b..617177e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -57,7 +56,6 @@ import java.util.concurrent.Future; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; @RunWith(PowerMockRunner.class) @PrepareForTest(KafkaConfigBackingStore.class) @@ -113,6 +111,9 @@ public class KafkaConfigBackingStoreTest { private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); + private static final Struct TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR + = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 0); + // The exact format doesn't matter here since both conversions are mocked private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList( "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), @@ -275,9 +276,7 @@ public class KafkaConfigBackingStoreTest { // Writing task task configs should block until all the writes have been performed and the root record update // has completed - Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); - taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)); - taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1)); + List<Map<String, String>> taskConfigs = Arrays.asList(SAMPLE_CONFIGS.get(0), SAMPLE_CONFIGS.get(1)); configStorage.putTaskConfigs("connector1", taskConfigs); // Validate root config by listing all connectors and tasks @@ -296,6 +295,57 @@ public class KafkaConfigBackingStoreTest { } @Test + public void testPutTaskConfigsZeroTasks() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); + + // Task configs should read to end, write to the log, read to end, write root. + expectReadToEnd(new LinkedHashMap<String, byte[]>()); + expectConvertWriteRead( + COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(0), + "tasks", 0); // We have 0 tasks + // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks + configUpdateListener.onTaskConfigUpdate(Collections.<ConnectorTaskId>emptyList()); + EasyMock.expectLastCall(); + + // Records to be read by consumer as it reads to the end of the log + LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>(); + serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + expectReadToEnd(serializedConfigs); + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Bootstrap as if we had already added the connector, but no tasks had been added yet + whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST); + + // Null before writing + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + + // Writing task task configs should block until all the writes have been performed and the root record update + // has completed + List<Map<String, String>> taskConfigs = Collections.emptyList(); + configStorage.putTaskConfigs("connector1", taskConfigs); + + // Validate root config by listing all connectors and tasks + configState = configStorage.snapshot(); + assertEquals(1, configState.offset()); + String connectorName = CONNECTOR_IDS.get(0); + assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); + assertEquals(Collections.emptyList(), configState.tasks(connectorName)); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test public void testRestore() throws Exception { // Restoring data should notify only of the latest values after loading is complete. This also validates // that inconsistent state is ignored. @@ -350,10 +400,63 @@ public class KafkaConfigBackingStoreTest { } @Test + public void testRestoreZeroTasks() throws Exception { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + expectConfigure(); + // Overwrite each type at least once to ensure we see the latest data after loading + List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + // Connector after root update should make it through, task update shouldn't + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), + new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)), + new ConsumerRecord<>(TOPIC, 0, 7, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(7))); + LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); + deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(7), TASKS_COMMIT_STRUCT_ZERO_TASK_CONNECTOR); + logOffset = 8; + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(8, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] + assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); + // Should see 0 tasks for that connector. + assertEquals(Collections.emptyList(), configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. - // We start out by loading an initial configuration where we started to write a task update and failed before - // writing an the commit, and then compaction cleaned up the earlier record. + // We start out by loading an initial configuration where we started to write a task update, and then + // compaction cleaned up the earlier record. expectConfigure(); List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList( @@ -371,9 +474,6 @@ public class KafkaConfigBackingStoreTest { logOffset = 6; expectStart(existingRecords, deserialized); - // One failed attempt to write new task configs - expectReadToEnd(new LinkedHashMap<String, byte[]>()); - // Successful attempt to write new task config expectReadToEnd(new LinkedHashMap<String, byte[]>()); expectConvertWriteRead( @@ -392,7 +492,6 @@ public class KafkaConfigBackingStoreTest { serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); expectReadToEnd(serializedConfigs); - expectStop(); PowerMock.replayAll(); @@ -410,17 +509,9 @@ public class KafkaConfigBackingStoreTest { assertNull(configState.taskConfig(TASK_IDS.get(1))); assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); - // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks) - try { - configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2))); - fail("Should have failed due to incomplete task set."); - } catch (KafkaException e) { - // expected - } - // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case // we are going to shrink the number of tasks to 1 - configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0))); + configStorage.putTaskConfigs("connector1", Collections.singletonList(SAMPLE_CONFIGS.get(0))); // Validate updated config configState = configStorage.snapshot(); // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
