Repository: kafka Updated Branches: refs/heads/trunk f37dab76f -> d98ca230a
KAFKA-4397: Refactor Connect backing stores for thread safety Author: Konstantine Karantasis <konstant...@confluent.io> Reviewers: Shikhar Bhushan <shik...@confluent.io>, Ewen Cheslack-Postava <e...@confluent.io> Closes #2123 from kkonstantine/KAFKA-4397-Refactor-connect-backing-stores-for-thread-safety Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d98ca230 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d98ca230 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d98ca230 Branch: refs/heads/trunk Commit: d98ca230a14b0aedae752fa97f5d55b3a0c49b9c Parents: f37dab7 Author: Konstantine Karantasis <konstant...@confluent.io> Authored: Tue Nov 29 15:30:40 2016 -0800 Committer: Ewen Cheslack-Postava <m...@ewencp.org> Committed: Tue Nov 29 15:31:14 2016 -0800 ---------------------------------------------------------------------- .../kafka/connect/cli/ConnectDistributed.java | 3 +- .../connect/storage/ConfigBackingStore.java | 3 - .../storage/KafkaConfigBackingStore.java | 75 ++++++++++---------- .../storage/KafkaStatusBackingStore.java | 4 +- .../storage/MemoryConfigBackingStore.java | 5 -- .../storage/KafkaConfigBackingStoreTest.java | 30 ++++---- 6 files changed, 58 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 7a09ac3..c3a61b2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -78,8 +78,7 @@ public class ConnectDistributed { StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter()); statusBackingStore.configure(config); - ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter()); - configBackingStore.configure(config); + ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter(), config); DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore, advertisedUrl.toString()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java index 77fc43b..6ab5a1b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.connect.storage; import org.apache.kafka.connect.runtime.TargetState; -import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -29,8 +28,6 @@ import java.util.concurrent.TimeoutException; public interface ConfigBackingStore { - void configure(WorkerConfig config); - void start(); void stop(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index af8efee..1a46693 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -192,23 +192,25 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { private static final long READ_TO_END_TIMEOUT_MS = 30000; private final Object lock; - private boolean starting; private final Converter converter; + private volatile boolean started; + // Although updateListener is not final, it's guaranteed to be visible to any thread after its + // initialization as long as we always read the volatile variable "started" before we access the listener. private UpdateListener updateListener; - private String topic; + private final String topic; // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect // format to serialized form - private KafkaBasedLog<String, byte[]> configLog; + private final KafkaBasedLog<String, byte[]> configLog; // Connector -> # of tasks - private Map<String, Integer> connectorTaskCounts = new HashMap<>(); + private final Map<String, Integer> connectorTaskCounts = new HashMap<>(); // Connector and task configs: name or id -> config map - private Map<String, Map<String, String>> connectorConfigs = new HashMap<>(); - private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); + private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>(); + private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>(); // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data // is in an inconsistent state and we cannot safely use them until they have been refreshed. - private Set<String> inconsistent = new HashSet<>(); + private final Set<String> inconsistent = new HashSet<>(); // The most recently read offset. This does not take into account deferred task updates/commits, so we may have // outstanding data to be applied. private volatile long offset; @@ -218,11 +220,17 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { private final Map<String, TargetState> connectorTargetStates = new HashMap<>(); - public KafkaConfigBackingStore(Converter converter) { + public KafkaConfigBackingStore(Converter converter, WorkerConfig config) { this.lock = new Object(); - this.starting = false; + this.started = false; this.converter = converter; this.offset = -1; + + this.topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); + if (this.topic.equals("")) + throw new ConfigException("Must specify topic for connector configuration."); + + configLog = setupAndCreateKafkaBasedLog(this.topic, config); } @Override @@ -231,33 +239,12 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { } @Override - public void configure(WorkerConfig config) { - topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG); - if (topic.equals("")) - throw new ConfigException("Must specify topic for connector configuration."); - - Map<String, Object> producerProps = new HashMap<>(); - producerProps.putAll(config.originals()); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); - - Map<String, Object> consumerProps = new HashMap<>(); - consumerProps.putAll(config.originals()); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - - configLog = createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback()); - } - - @Override public void start() { log.info("Starting KafkaConfigBackingStore"); - // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that + // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that // updates can continue to occur in the background - starting = true; configLog.start(); - starting = false; + started = true; log.info("Started KafkaConfigBackingStore"); } @@ -295,7 +282,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { } /** - * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by + * Write this connector configuration to persistent storage and wait until it has been acknowledged and read back by * tailing the Kafka log with a consumer. * * @param connector name of the connector to write data for @@ -416,6 +403,22 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { configLog.send(TARGET_STATE_KEY(connector), serializedTargetState); } + // package private for testing + KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, WorkerConfig config) { + Map<String, Object> producerProps = new HashMap<>(); + producerProps.putAll(config.originals()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + + Map<String, Object> consumerProps = new HashMap<>(); + consumerProps.putAll(config.originals()); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + + return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback()); + } + private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps, Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) { return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); @@ -480,7 +483,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { // Note that we do not notify the update listener if the target state has been removed. // Instead we depend on the removal callback of the connector config itself to notify the worker. - if (!starting && !removed) + if (started && !removed) updateListener.onConnectorTargetStateChange(connectorName); } else if (record.key().startsWith(CONNECTOR_PREFIX)) { @@ -513,7 +516,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { connectorTargetStates.put(connectorName, TargetState.STARTED); } } - if (!starting) { + if (started) { if (removed) updateListener.onConnectorConfigRemove(connectorName); else @@ -604,7 +607,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { connectorTaskCounts.put(connectorName, newTaskCount); } - if (!starting) + if (started) updateListener.onTaskConfigUpdate(updatedTasks); } else { log.error("Discarding config update record with invalid key: " + record.key()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java index c377ff6..38a239e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java @@ -378,7 +378,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { if (status == null) return; - synchronized (KafkaStatusBackingStore.this) { + synchronized (this) { log.trace("Received connector {} status update {}", connector, status); CacheEntry<ConnectorStatus> entry = getOrAdd(connector); entry.put(status); @@ -404,7 +404,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore { return; } - synchronized (KafkaStatusBackingStore.this) { + synchronized (this) { log.trace("Received task {} status update {}", id, status); CacheEntry<TaskStatus> entry = getOrAdd(id); entry.put(status); http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index 212022d..781b5bf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -17,7 +17,6 @@ package org.apache.kafka.connect.storage; import org.apache.kafka.connect.runtime.TargetState; -import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -35,10 +34,6 @@ public class MemoryConfigBackingStore implements ConfigBackingStore { private UpdateListener updateListener; @Override - public void configure(WorkerConfig config) { - } - - @Override public synchronized void start() { } http://git-wip-us.apache.org/repos/asf/kafka/blob/d98ca230/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index f5bce8f..4a101c3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -133,6 +133,7 @@ public class KafkaConfigBackingStoreTest { KafkaBasedLog<String, byte[]> storeLog; private KafkaConfigBackingStore configStorage; + private String internalTopic; private Capture<String> capturedTopic = EasyMock.newCapture(); private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture(); private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture(); @@ -142,7 +143,8 @@ public class KafkaConfigBackingStoreTest { @Before public void setUp() { - configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter); + configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter, DEFAULT_DISTRIBUTED_CONFIG); + Whitebox.setInternalState(configStorage, "configLog", storeLog); configStorage.setUpdateListener(configUpdateListener); } @@ -154,7 +156,8 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); + assertEquals(TOPIC, capturedTopic.getValue()); assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); @@ -193,7 +196,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Null before writing @@ -261,8 +264,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Bootstrap as if we had already added the connector, but no tasks had been added yet @@ -317,7 +319,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Bootstrap as if we had already added the connector, but no tasks had been added yet @@ -370,7 +372,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Should see a single connector with initial state paused @@ -412,7 +414,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Should see a single connector with initial state paused @@ -462,7 +464,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Should see a single connector with initial state paused @@ -501,7 +503,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // The target state deletion should reset the state to STARTED @@ -548,7 +550,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Should see a single connector and its config should be the last one seen anywhere in the log @@ -602,7 +604,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Should see a single connector and its config should be the last one seen anywhere in the log @@ -649,7 +651,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // Should see a single connector and its config should be the last one seen anywhere in the log @@ -712,7 +714,7 @@ public class KafkaConfigBackingStoreTest { PowerMock.replayAll(); - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.setupAndCreateKafkaBasedLog(TOPIC, DEFAULT_DISTRIBUTED_CONFIG); configStorage.start(); // After reading the log, it should have been in an inconsistent state ClusterConfigState configState = configStorage.snapshot();