showuon commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r901473434
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -63,28 +70,109 @@
public class KafkaOffsetBackingStore implements OffsetBackingStore {
private static final Logger log =
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
- private KafkaBasedLog<byte[], byte[]> offsetLog;
- private HashMap<ByteBuffer, ByteBuffer> data;
+ /**
+ * Build a connector-specific offset store with read and write support.
The producer will be {@link Producer#close(Duration) closed}
+ * and the consumer will be {@link Consumer#close(Duration) closed} when
this store is {@link #stop() stopped}, but the topic admin
+ * must be {@link TopicAdmin#close(Duration) closed} by the caller.
+ * @param topic the name of the offsets topic to use
+ * @param producer the producer to use for writing to the offsets topic
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying
metadata for the offsets topic
+ * @return an offset store backed by the given topic and Kafka clients
+ */
+ public static KafkaOffsetBackingStore forTask(
+ String topic,
+ Producer<byte[], byte[]> producer,
+ Consumer<byte[], byte[]> consumer,
+ TopicAdmin topicAdmin
+ ) {
+ return new KafkaOffsetBackingStore(() -> topicAdmin) {
+ @Override
+ public void configure(final WorkerConfig config) {
+ exactlyOnce = config.exactlyOnceSourceEnabled();
+ offsetLog = KafkaBasedLog.withExistingClients(
+ topic,
+ consumer,
+ producer,
+ topicAdmin,
+ consumedCallback,
+ Time.SYSTEM,
+ initialize(topic, newTopicDescription(topic, config))
+ );
+ }
+ };
+ }
+
+ /**
+ * Build a connector-specific offset store with read-only support. The
consumer will be {@link Consumer#close(Duration) closed}
+ * when this store is {@link #stop() stopped}, but the topic admin must be
{@link TopicAdmin#close(Duration) closed} by the caller.
+ * @param topic the name of the offsets topic to use
+ * @param consumer the consumer to use for reading from the offsets topic
+ * @param topicAdmin the topic admin to use for creating and querying
metadata for the offsets topic
+ * @return a read-only offset store backed by the given topic and Kafka
clients
+ */
+ public static KafkaOffsetBackingStore forConnector(
+ String topic,
+ Consumer<byte[], byte[]> consumer,
+ TopicAdmin topicAdmin
+ ) {
+ return new KafkaOffsetBackingStore(() -> topicAdmin) {
+ @Override
+ public void configure(final WorkerConfig config) {
+ exactlyOnce = config.exactlyOnceSourceEnabled();
+ offsetLog = KafkaBasedLog.withExistingClients(
+ topic,
+ consumer,
+ null,
+ topicAdmin,
+ consumedCallback,
+ Time.SYSTEM,
+ initialize(topic, newTopicDescription(topic, config))
+ );
+ }
+ };
+ }
+
+ protected KafkaBasedLog<byte[], byte[]> offsetLog;
+ private final HashMap<ByteBuffer, ByteBuffer> data = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private SharedTopicAdmin ownTopicAdmin;
+ protected boolean exactlyOnce;
+ /**
+ * Create an {@link OffsetBackingStore} backed by a Kafka topic. This
constructor will cause the
+ * store to instantiate and close its own {@link TopicAdmin} during {@link
#configure(WorkerConfig)}
+ * and {@link #stop()}, respectively.
+ *
+ * @deprecated use {@link #KafkaOffsetBackingStore(Supplier)} instead
+ */
@Deprecated
public KafkaOffsetBackingStore() {
this.topicAdminSupplier = null;
}
+ /**
+ * Create an {@link OffsetBackingStore} backed by a Kafka topic. This
constructor will use the given
+ * {@link Supplier} to acquire a {@link TopicAdmin} that will be used for
interactions with the backing
+ * Kafka topic. The caller is expected to manage the lifecycle of that
object, including
+ * {@link TopicAdmin#close(Duration) closing} it when it is no longer
needed.
+ * @param topicAdmin a {@link Supplier} for the {@link TopicAdmin} to use
for this backing store;
+ * may not be null, and may not return null
+ */
public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
}
+
@Override
public void configure(final WorkerConfig config) {
String topic =
config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
if (topic == null || topic.trim().length() == 0)
throw new ConfigException("Offset storage topic must be
specified");
+ exactlyOnce = config.exactlyOnceSourceEnabled();
Review Comment:
unused variable?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1425,6 +1428,268 @@ public WorkerTask doBuild(Task task,
}
}
+ // Visible for testing
+ ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector(
+ SourceConnectorConfig sourceConfig,
+ String connName,
+ Connector connector
+ ) {
+ String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic();
+
+ Map<String, Object> producerProps = baseProducerConfigs(connName,
"connector-producer-" + connName, config, sourceConfig, connector.getClass(),
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+
+ // We use a connector-specific store (i.e., a dedicated
KafkaOffsetBackingStore for this connector)
+ // if the worker supports per-connector offsets topics (which may be
the case in distributed but not standalone mode, for example)
+ // and if the connector is explicitly configured with an offsets topic
+ final boolean usesConnectorSpecificStore =
connectorSpecificOffsetsTopic != null
+ && config.connectorOffsetsTopicsPermitted();
+
+ if (usesConnectorSpecificStore) {
+ Map<String, Object> consumerProps =
regularSourceOffsetsConsumerConfigs(
+ connName, "connector-consumer-" + connName, config,
sourceConfig, connector.getClass(),
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
+
+ Map<String, Object> adminOverrides = adminConfigs(connName,
"connector-adminclient-" + connName, config,
+ sourceConfig, connector.getClass(),
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+
+ TopicAdmin admin = new TopicAdmin(adminOverrides);
+ KafkaOffsetBackingStore connectorStore =
+
KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer,
admin);
+
+ // If the connector's offsets topic is the same as the
worker-global offsets topic, there's no need to construct
+ // an offset store that has a primary and a secondary store which
both read from that same topic.
+ // So, if the user has explicitly configured the connector with a
connector-specific offsets topic
+ // but we know that that topic is the same as the worker-global
offsets topic, we ignore the worker-global
+ // offset store and build a store backed exclusively by a
connector-specific offsets store.
+ // It may seem reasonable to instead build a store backed
exclusively by the worker-global offset store, but that
+ // would prevent users from being able to customize the config
properties used for the Kafka clients that
+ // access the offsets topic, and we would not be able to establish
reasonable defaults like setting
+ // isolation.level=read_committed for the offsets topic consumer
for this connector
+ if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic,
producerProps)) {
+ return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+ () -> LoggingContext.forConnector(connName),
+ connectorStore,
+ connectorSpecificOffsetsTopic,
+ admin
+ );
+ } else {
+ return
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+ () -> LoggingContext.forConnector(connName),
+ globalOffsetBackingStore,
+ connectorStore,
+ connectorSpecificOffsetsTopic,
+ admin
+ );
+ }
+ } else {
+ return ConnectorOffsetBackingStore.withOnlyWorkerStore(
+ () -> LoggingContext.forConnector(connName),
+ globalOffsetBackingStore,
+ config.offsetsTopic()
+ );
+ }
+ }
+
+ // Visible for testing
+ ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector(
+ SourceConnectorConfig sourceConfig,
+ String connName,
+ Connector connector
+ ) {
+ String connectorSpecificOffsetsTopic =
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+ Map<String, Object> producerProps = baseProducerConfigs(connName,
"connector-producer-" + connName, config, sourceConfig, connector.getClass(),
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+
+ Map<String, Object> consumerProps =
exactlyOnceSourceOffsetsConsumerConfigs(
+ connName, "connector-consumer-" + connName, config,
sourceConfig, connector.getClass(),
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
+
+ Map<String, Object> adminOverrides = adminConfigs(connName,
"connector-adminclient-" + connName, config,
+ sourceConfig, connector.getClass(),
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+
+ TopicAdmin admin = new TopicAdmin(adminOverrides);
+ KafkaOffsetBackingStore connectorStore =
+
KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer,
admin);
+
+ // If the connector's offsets topic is the same as the worker-global
offsets topic, there's no need to construct
+ // an offset store that has a primary and a secondary store which both
read from that same topic.
+ // So, even if the user has explicitly configured the connector with a
connector-specific offsets topic,
+ // if we know that that topic is the same as the worker-global offsets
topic, we ignore the worker-global
+ // offset store and build a store backed exclusively by a
connector-specific offsets store.
+ // It may seem reasonable to instead build a store backed exclusively
by the worker-global offset store, but that
+ // would prevent users from being able to customize the config
properties used for the Kafka clients that
+ // access the offsets topic, and may lead to confusion for them when
tasks are created for the connector
+ // since they will all have their own dedicated offsets stores anyways
+ if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic,
producerProps)) {
+ return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+ () -> LoggingContext.forConnector(connName),
+ connectorStore,
+ connectorSpecificOffsetsTopic,
+ admin
+ );
+ } else {
+ return ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+ () -> LoggingContext.forConnector(connName),
+ globalOffsetBackingStore,
+ connectorStore,
+ connectorSpecificOffsetsTopic,
+ admin
+ );
+ }
+ }
+
+ // Visible for testing
+ ConnectorOffsetBackingStore offsetStoreForRegularSourceTask(
+ ConnectorTaskId id,
+ SourceConnectorConfig sourceConfig,
+ Class<? extends Connector> connectorClass,
+ Producer<byte[], byte[]> producer,
+ Map<String, Object> producerProps,
+ TopicAdmin topicAdmin
+ ) {
+ String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic();
+
+ if (regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConfig)) {
+ Objects.requireNonNull(topicAdmin, "Source tasks require a
non-null topic admin when configured to use their own offsets topic");
+
+ Map<String, Object> consumerProps =
regularSourceOffsetsConsumerConfigs(
+ id.connector(), "connector-consumer-" + id, config,
sourceConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
+
+ KafkaOffsetBackingStore connectorStore =
+
KafkaOffsetBackingStore.forTask(sourceConfig.offsetsTopic(), producer,
consumer, topicAdmin);
+
+ // If the connector's offsets topic is the same as the
worker-global offsets topic, there's no need to construct
+ // an offset store that has a primary and a secondary store which
both read from that same topic.
+ // So, if the user has (implicitly or explicitly) configured the
connector with a connector-specific offsets topic
+ // but we know that that topic is the same as the worker-global
offsets topic, we ignore the worker-global
+ // offset store and build a store backed exclusively by a
connector-specific offsets store.
+ // It may seem reasonable to instead build a store backed
exclusively by the worker-global offset store, but that
+ // would prevent users from being able to customize the config
properties used for the Kafka clients that
+ // access the offsets topic, and we would not be able to establish
reasonable defaults like setting
+ // isolation.level=read_committed for the offsets topic consumer
for this task
+ if (sameOffsetTopicAsWorker(sourceConfig.offsetsTopic(),
producerProps)) {
+ return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+ () -> LoggingContext.forTask(id),
+ connectorStore,
+ connectorSpecificOffsetsTopic,
+ topicAdmin
+ );
+ } else {
+ return
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+ () -> LoggingContext.forTask(id),
+ globalOffsetBackingStore,
+ connectorStore,
+ connectorSpecificOffsetsTopic,
+ topicAdmin
+ );
+ }
+ } else {
+ return ConnectorOffsetBackingStore.withOnlyWorkerStore(
+ () -> LoggingContext.forTask(id),
+ globalOffsetBackingStore,
+ config.offsetsTopic()
+ );
+ }
+ }
+
+ // Visible for testing
+ ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask(
+ ConnectorTaskId id,
+ SourceConnectorConfig sourceConfig,
+ Class<? extends Connector> connectorClass,
+ Producer<byte[], byte[]> producer,
+ Map<String, Object> producerProps,
+ TopicAdmin topicAdmin
+ ) {
+ Objects.requireNonNull(topicAdmin, "Source tasks require a non-null
topic admin when exactly-once support is enabled");
+
+ Map<String, Object> consumerProps =
exactlyOnceSourceOffsetsConsumerConfigs(
+ id.connector(), "connector-consumer-" + id, config,
sourceConfig, connectorClass,
+ connectorClientConfigOverridePolicy, kafkaClusterId);
+ KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(consumerProps);
+
+ String connectorOffsetsTopic =
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+ KafkaOffsetBackingStore connectorStore =
+ KafkaOffsetBackingStore.forTask(connectorOffsetsTopic,
producer, consumer, topicAdmin);
+
+ // If the connector's offsets topic is the same as the worker-global
offsets topic, there's no need to construct
+ // an offset store that has a primary and a secondary store which both
read from that same topic.
+ // So, if the user has (implicitly or explicitly) configured the
connector with a connector-specific offsets topic
+ // but we know that that topic is the same as the worker-global
offsets topic, we ignore the worker-global
+ // offset store and build a store backed exclusively by a
connector-specific offsets store.
+ // We cannot under any circumstances build an offset store backed
exclusively by the worker-global offset store
+ // as that would prevent us from being able to write source records
and source offset information for the task
+ // with the same producer, and therefore, in the same transaction.
+ if (sameOffsetTopicAsWorker(connectorOffsetsTopic, producerProps)) {
+ return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+ () -> LoggingContext.forTask(id),
+ connectorStore,
+ connectorOffsetsTopic,
+ topicAdmin
+ );
+ } else {
+ return ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+ () -> LoggingContext.forTask(id),
+ globalOffsetBackingStore,
+ connectorStore,
+ connectorOffsetsTopic,
+ topicAdmin
+ );
+ }
+ }
+
+ /**
+ * Gives a best-effort guess for whether the given offsets topic is the
same topic as the worker-global offsets topic.
+ * Even if the name of the topic is the same as the name of the worker's
offsets topic, the two may still be different topics
+ * if the connector is configured to produce to a different Kafka cluster
than the one that hosts the worker's offsets topic.
+ * @param offsetsTopic the name of the offsets topic for the connector
+ * @param producerProps the producer configuration for the connector
+ * @return whether it appears that the connector's offsets topic is the
same topic as the worker-global offsets topic.
+ * If {@code true}, it is guaranteed that the two are the same;
+ * if {@code false}, it is likely but not guaranteed that the two are not
the same
+ */
+ private boolean sameOffsetTopicAsWorker(String offsetsTopic, Map<String,
Object> producerProps) {
+ // We can check the offset topic name and the Kafka cluster's
bootstrap servers,
+ // although this isn't exact and can lead to some false negatives if
the user
+ // provides an overridden bootstrap servers value for their producer
that is different than
+ // the worker's but still resolves to the same Kafka cluster used by
the worker.
+ // At the moment this is probably adequate, especially since we don't
want to put
+ // a network ping to a remote Kafka cluster inside the herder's tick
thread (which is where this
+ // logic takes place right now) in case that takes a while.
+ Set<String> workerBootstrapServers = new
HashSet<>(config.getList(BOOTSTRAP_SERVERS_CONFIG));
+ Set<String> producerBootstrapServers = new HashSet<>();
+ try {
+ String rawBootstrapServers =
producerProps.getOrDefault(BOOTSTRAP_SERVERS_CONFIG, "").toString();
+ @SuppressWarnings("unchecked")
+ List<String> parsedBootstrapServers = (List<String>)
ConfigDef.parseType(BOOTSTRAP_SERVERS_CONFIG, rawBootstrapServers,
ConfigDef.Type.LIST);
+ producerBootstrapServers.addAll(parsedBootstrapServers);
+ } catch (Exception e) {
+ // Should never happen by this point, but if it does, make sure to
present a readable error message to the user
+ throw new ConnectException("Failed to parse bootstrap servers
property in producer config", e);
+ }
+ producerBootstrapServers = new HashSet<>(producerBootstrapServers);
Review Comment:
We've add all bootstrap servers into `producerBootstrapServers` set in
L1672, why should we re-create another hashSet here?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -16,56 +16,325 @@
*/
package org.apache.kafka.connect.storage;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+/**
+ * An {@link OffsetBackingStore} with support for reading from and writing to
a worker-global
+ * offset backing store and/or a connector-specific offset backing store.
+ */
public class ConnectorOffsetBackingStore implements OffsetBackingStore {
- private final OffsetBackingStore workerStore;
+ private static final Logger log =
LoggerFactory.getLogger(ConnectorOffsetBackingStore.class);
+
+ /**
+ * Builds an offset store that uses a connector-specific offset topic as
the primary store and
+ * the worker-global offset store as the secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param workerStore the worker-global offset store; may not be null
+ * @param connectorStore the connector-specific offset store; may not be
null
+ * @param connectorOffsetsTopic the name of the connector-specific offset
topic; may not be null
+ * @param connectorStoreAdmin the topic admin to use for the
connector-specific offset topic; may not be null
+ * @return an offset store backed primarily by the connector-specific
offset topic and secondarily
+ * by the worker-global offset store; never null
+ */
+ public static ConnectorOffsetBackingStore withConnectorAndWorkerStores(
+ Supplier<LoggingContext> loggingContext,
+ OffsetBackingStore workerStore,
+ KafkaOffsetBackingStore connectorStore,
+ String connectorOffsetsTopic,
+ TopicAdmin connectorStoreAdmin
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(workerStore);
+ Objects.requireNonNull(connectorStore);
+ Objects.requireNonNull(connectorOffsetsTopic);
+ Objects.requireNonNull(connectorStoreAdmin);
+ return new ConnectorOffsetBackingStore(
+ Time.SYSTEM,
+ loggingContext,
+ connectorOffsetsTopic,
+ workerStore,
+ connectorStore,
+ connectorStoreAdmin
+ );
+ }
+
+ /**
+ * Builds an offset store that uses the worker-global offset store as the
primary store, and no secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param workerStore the worker-global offset store; may not be null
+ * @param workerOffsetsTopic the name of the worker-global offset topic;
may be null if the worker
+ * does not use an offset topic for its offset
store
+ * @return an offset store for the connector backed solely by the
worker-global offset store; never null
+ */
+ public static ConnectorOffsetBackingStore withOnlyWorkerStore(
+ Supplier<LoggingContext> loggingContext,
+ OffsetBackingStore workerStore,
+ String workerOffsetsTopic
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(workerStore);
+ return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext,
workerOffsetsTopic, workerStore, null, null);
+ }
+
+ /**
+ * Builds an offset store that uses a connector-specific offset topic as
the primary store, and no secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param connectorStore the connector-specific offset store; may not be
null
+ * @param connectorOffsetsTopic the name of the connector-specific offset
topic; may not be null
+ * @param connectorStoreAdmin the topic admin to use for the
connector-specific offset topic; may not be null
+ * @return an offset store for the connector backed solely by the
connector-specific offset topic; never null
+ */
+ public static ConnectorOffsetBackingStore withOnlyConnectorStore(
+ Supplier<LoggingContext> loggingContext,
+ KafkaOffsetBackingStore connectorStore,
+ String connectorOffsetsTopic,
+ TopicAdmin connectorStoreAdmin
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(connectorOffsetsTopic);
+ Objects.requireNonNull(connectorStoreAdmin);
+ return new ConnectorOffsetBackingStore(
+ Time.SYSTEM,
+ loggingContext,
+ connectorOffsetsTopic,
+ null,
+ connectorStore,
+ connectorStoreAdmin
+ );
+ }
+
+ private final Time time;
+ private final Supplier<LoggingContext> loggingContext;
private final String primaryOffsetsTopic;
+ private final Optional<OffsetBackingStore> workerStore;
+ private final Optional<KafkaOffsetBackingStore> connectorStore;
+ private final Optional<TopicAdmin> connectorStoreAdmin;
- public ConnectorOffsetBackingStore(
+ ConnectorOffsetBackingStore(
+ Time time,
+ Supplier<LoggingContext> loggingContext,
+ String primaryOffsetsTopic,
OffsetBackingStore workerStore,
- String primaryOffsetsTopic
+ KafkaOffsetBackingStore connectorStore,
+ TopicAdmin connectorStoreAdmin
) {
- this.workerStore = workerStore;
+ if (workerStore == null && connectorStore == null) {
+ throw new IllegalArgumentException("At least one non-null offset
store must be provided");
+ }
+ this.time = time;
+ this.loggingContext = loggingContext;
this.primaryOffsetsTopic = primaryOffsetsTopic;
+ this.workerStore = Optional.ofNullable(workerStore);
+ this.connectorStore = Optional.ofNullable(connectorStore);
+ this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin);
}
public String primaryOffsetsTopic() {
return primaryOffsetsTopic;
}
+ /**
+ * If configured to use a connector-specific offset store, {@link
OffsetBackingStore#start() start} that store.
+ *
+ * <p>The worker-global offset store is not modified; it is the caller's
responsibility to ensure that it is started
+ * before calls to {@link #get(Collection)} and {@link #set(Map,
Callback)} take place.
+ */
@Override
public void start() {
- // TODO
+ // Worker offset store should already be started
+ connectorStore.ifPresent(OffsetBackingStore::start);
}
+ /**
+ * If configured to use a connector-specific offset store, {@link
OffsetBackingStore#start() stop} that store,
Review Comment:
OffsetBackingStore#start() -> OffsetBackingStore#stop() ?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -16,56 +16,325 @@
*/
package org.apache.kafka.connect.storage;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+/**
+ * An {@link OffsetBackingStore} with support for reading from and writing to
a worker-global
+ * offset backing store and/or a connector-specific offset backing store.
+ */
public class ConnectorOffsetBackingStore implements OffsetBackingStore {
- private final OffsetBackingStore workerStore;
+ private static final Logger log =
LoggerFactory.getLogger(ConnectorOffsetBackingStore.class);
+
+ /**
+ * Builds an offset store that uses a connector-specific offset topic as
the primary store and
+ * the worker-global offset store as the secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param workerStore the worker-global offset store; may not be null
+ * @param connectorStore the connector-specific offset store; may not be
null
+ * @param connectorOffsetsTopic the name of the connector-specific offset
topic; may not be null
+ * @param connectorStoreAdmin the topic admin to use for the
connector-specific offset topic; may not be null
+ * @return an offset store backed primarily by the connector-specific
offset topic and secondarily
+ * by the worker-global offset store; never null
+ */
+ public static ConnectorOffsetBackingStore withConnectorAndWorkerStores(
+ Supplier<LoggingContext> loggingContext,
+ OffsetBackingStore workerStore,
+ KafkaOffsetBackingStore connectorStore,
+ String connectorOffsetsTopic,
+ TopicAdmin connectorStoreAdmin
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(workerStore);
+ Objects.requireNonNull(connectorStore);
+ Objects.requireNonNull(connectorOffsetsTopic);
+ Objects.requireNonNull(connectorStoreAdmin);
+ return new ConnectorOffsetBackingStore(
+ Time.SYSTEM,
+ loggingContext,
+ connectorOffsetsTopic,
+ workerStore,
+ connectorStore,
+ connectorStoreAdmin
+ );
+ }
+
+ /**
+ * Builds an offset store that uses the worker-global offset store as the
primary store, and no secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param workerStore the worker-global offset store; may not be null
+ * @param workerOffsetsTopic the name of the worker-global offset topic;
may be null if the worker
+ * does not use an offset topic for its offset
store
+ * @return an offset store for the connector backed solely by the
worker-global offset store; never null
+ */
+ public static ConnectorOffsetBackingStore withOnlyWorkerStore(
+ Supplier<LoggingContext> loggingContext,
+ OffsetBackingStore workerStore,
+ String workerOffsetsTopic
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(workerStore);
+ return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext,
workerOffsetsTopic, workerStore, null, null);
+ }
+
+ /**
+ * Builds an offset store that uses a connector-specific offset topic as
the primary store, and no secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param connectorStore the connector-specific offset store; may not be
null
+ * @param connectorOffsetsTopic the name of the connector-specific offset
topic; may not be null
+ * @param connectorStoreAdmin the topic admin to use for the
connector-specific offset topic; may not be null
+ * @return an offset store for the connector backed solely by the
connector-specific offset topic; never null
+ */
+ public static ConnectorOffsetBackingStore withOnlyConnectorStore(
+ Supplier<LoggingContext> loggingContext,
+ KafkaOffsetBackingStore connectorStore,
+ String connectorOffsetsTopic,
+ TopicAdmin connectorStoreAdmin
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(connectorOffsetsTopic);
+ Objects.requireNonNull(connectorStoreAdmin);
+ return new ConnectorOffsetBackingStore(
+ Time.SYSTEM,
+ loggingContext,
+ connectorOffsetsTopic,
+ null,
+ connectorStore,
+ connectorStoreAdmin
+ );
+ }
+
+ private final Time time;
+ private final Supplier<LoggingContext> loggingContext;
private final String primaryOffsetsTopic;
+ private final Optional<OffsetBackingStore> workerStore;
+ private final Optional<KafkaOffsetBackingStore> connectorStore;
+ private final Optional<TopicAdmin> connectorStoreAdmin;
- public ConnectorOffsetBackingStore(
+ ConnectorOffsetBackingStore(
+ Time time,
+ Supplier<LoggingContext> loggingContext,
+ String primaryOffsetsTopic,
OffsetBackingStore workerStore,
- String primaryOffsetsTopic
+ KafkaOffsetBackingStore connectorStore,
+ TopicAdmin connectorStoreAdmin
) {
- this.workerStore = workerStore;
+ if (workerStore == null && connectorStore == null) {
+ throw new IllegalArgumentException("At least one non-null offset
store must be provided");
+ }
+ this.time = time;
+ this.loggingContext = loggingContext;
this.primaryOffsetsTopic = primaryOffsetsTopic;
+ this.workerStore = Optional.ofNullable(workerStore);
+ this.connectorStore = Optional.ofNullable(connectorStore);
+ this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin);
}
public String primaryOffsetsTopic() {
return primaryOffsetsTopic;
}
+ /**
+ * If configured to use a connector-specific offset store, {@link
OffsetBackingStore#start() start} that store.
+ *
+ * <p>The worker-global offset store is not modified; it is the caller's
responsibility to ensure that it is started
+ * before calls to {@link #get(Collection)} and {@link #set(Map,
Callback)} take place.
+ */
@Override
public void start() {
- // TODO
+ // Worker offset store should already be started
+ connectorStore.ifPresent(OffsetBackingStore::start);
}
+ /**
+ * If configured to use a connector-specific offset store, {@link
OffsetBackingStore#start() stop} that store,
+ * and the {@link TopicAdmin} used by that store.
Review Comment:
and [close] the {@link TopicAdmin} used by that store.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -16,56 +16,325 @@
*/
package org.apache.kafka.connect.storage;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+/**
+ * An {@link OffsetBackingStore} with support for reading from and writing to
a worker-global
+ * offset backing store and/or a connector-specific offset backing store.
+ */
public class ConnectorOffsetBackingStore implements OffsetBackingStore {
- private final OffsetBackingStore workerStore;
+ private static final Logger log =
LoggerFactory.getLogger(ConnectorOffsetBackingStore.class);
+
+ /**
+ * Builds an offset store that uses a connector-specific offset topic as
the primary store and
+ * the worker-global offset store as the secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param workerStore the worker-global offset store; may not be null
+ * @param connectorStore the connector-specific offset store; may not be
null
+ * @param connectorOffsetsTopic the name of the connector-specific offset
topic; may not be null
+ * @param connectorStoreAdmin the topic admin to use for the
connector-specific offset topic; may not be null
+ * @return an offset store backed primarily by the connector-specific
offset topic and secondarily
+ * by the worker-global offset store; never null
+ */
+ public static ConnectorOffsetBackingStore withConnectorAndWorkerStores(
+ Supplier<LoggingContext> loggingContext,
+ OffsetBackingStore workerStore,
+ KafkaOffsetBackingStore connectorStore,
+ String connectorOffsetsTopic,
+ TopicAdmin connectorStoreAdmin
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(workerStore);
+ Objects.requireNonNull(connectorStore);
+ Objects.requireNonNull(connectorOffsetsTopic);
+ Objects.requireNonNull(connectorStoreAdmin);
+ return new ConnectorOffsetBackingStore(
+ Time.SYSTEM,
+ loggingContext,
+ connectorOffsetsTopic,
+ workerStore,
+ connectorStore,
+ connectorStoreAdmin
+ );
+ }
+
+ /**
+ * Builds an offset store that uses the worker-global offset store as the
primary store, and no secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param workerStore the worker-global offset store; may not be null
+ * @param workerOffsetsTopic the name of the worker-global offset topic;
may be null if the worker
+ * does not use an offset topic for its offset
store
+ * @return an offset store for the connector backed solely by the
worker-global offset store; never null
+ */
+ public static ConnectorOffsetBackingStore withOnlyWorkerStore(
+ Supplier<LoggingContext> loggingContext,
+ OffsetBackingStore workerStore,
+ String workerOffsetsTopic
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(workerStore);
+ return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext,
workerOffsetsTopic, workerStore, null, null);
+ }
+
+ /**
+ * Builds an offset store that uses a connector-specific offset topic as
the primary store, and no secondary store.
+ *
+ * @param loggingContext a {@link Supplier} for the {@link LoggingContext}
that should be used
+ * for messages logged by this offset store; may not
be null, and may never return null
+ * @param connectorStore the connector-specific offset store; may not be
null
+ * @param connectorOffsetsTopic the name of the connector-specific offset
topic; may not be null
+ * @param connectorStoreAdmin the topic admin to use for the
connector-specific offset topic; may not be null
+ * @return an offset store for the connector backed solely by the
connector-specific offset topic; never null
+ */
+ public static ConnectorOffsetBackingStore withOnlyConnectorStore(
+ Supplier<LoggingContext> loggingContext,
+ KafkaOffsetBackingStore connectorStore,
+ String connectorOffsetsTopic,
+ TopicAdmin connectorStoreAdmin
+ ) {
+ Objects.requireNonNull(loggingContext);
+ Objects.requireNonNull(connectorOffsetsTopic);
+ Objects.requireNonNull(connectorStoreAdmin);
+ return new ConnectorOffsetBackingStore(
+ Time.SYSTEM,
+ loggingContext,
+ connectorOffsetsTopic,
+ null,
+ connectorStore,
+ connectorStoreAdmin
+ );
+ }
+
+ private final Time time;
+ private final Supplier<LoggingContext> loggingContext;
private final String primaryOffsetsTopic;
+ private final Optional<OffsetBackingStore> workerStore;
+ private final Optional<KafkaOffsetBackingStore> connectorStore;
+ private final Optional<TopicAdmin> connectorStoreAdmin;
- public ConnectorOffsetBackingStore(
+ ConnectorOffsetBackingStore(
+ Time time,
+ Supplier<LoggingContext> loggingContext,
+ String primaryOffsetsTopic,
OffsetBackingStore workerStore,
- String primaryOffsetsTopic
+ KafkaOffsetBackingStore connectorStore,
+ TopicAdmin connectorStoreAdmin
) {
- this.workerStore = workerStore;
+ if (workerStore == null && connectorStore == null) {
+ throw new IllegalArgumentException("At least one non-null offset
store must be provided");
+ }
+ this.time = time;
+ this.loggingContext = loggingContext;
this.primaryOffsetsTopic = primaryOffsetsTopic;
+ this.workerStore = Optional.ofNullable(workerStore);
+ this.connectorStore = Optional.ofNullable(connectorStore);
+ this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin);
}
public String primaryOffsetsTopic() {
return primaryOffsetsTopic;
}
+ /**
+ * If configured to use a connector-specific offset store, {@link
OffsetBackingStore#start() start} that store.
+ *
+ * <p>The worker-global offset store is not modified; it is the caller's
responsibility to ensure that it is started
+ * before calls to {@link #get(Collection)} and {@link #set(Map,
Callback)} take place.
+ */
@Override
public void start() {
- // TODO
+ // Worker offset store should already be started
+ connectorStore.ifPresent(OffsetBackingStore::start);
}
+ /**
+ * If configured to use a connector-specific offset store, {@link
OffsetBackingStore#start() stop} that store,
+ * and the {@link TopicAdmin} used by that store.
+ *
+ * <p>The worker-global offset store is not modified as it may be used for
other connectors that either already exist,
+ * or will be created, on this worker.
+ */
@Override
public void stop() {
- // TODO
+ // Worker offset store should not be stopped as it may be used for
multiple connectors
+ connectorStore.ifPresent(OffsetBackingStore::stop);
+ connectorStoreAdmin.ifPresent(TopicAdmin::close);
}
+ /**
+ * Get the offset values for the specified keys.
+ *
+ * <p>If configured to use a connector-specific offset store, priority is
given to the values contained in that store,
+ * and the values in the worker-global offset store (if one is provided)
are used as a fallback for keys that are not
+ * present in the connector-specific store.
+ *
+ * <p>If not configured to use a connector-specific offset store, only the
values contained in the worker-global
+ * offset store are returned.
+
+ * @param keys list of keys to look up
+ * @return future for the resulting map from key to value
+ */
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer>
keys) {
- // TODO
- return workerStore.get(keys);
+ Future<Map<ByteBuffer, ByteBuffer>> workerGetFuture =
getFromStore(workerStore, keys);
+ Future<Map<ByteBuffer, ByteBuffer>> connectorGetFuture =
getFromStore(connectorStore, keys);
+
+ return new Future<Map<ByteBuffer, ByteBuffer>>() {
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // Note the use of | instead of || here; this causes cancel to
be invoked on both futures,
+ // even if the first call to cancel returns true
+ return workerGetFuture.cancel(mayInterruptIfRunning)
+ | connectorGetFuture.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return workerGetFuture.isCancelled()
+ || connectorGetFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return workerGetFuture.isDone()
+ && connectorGetFuture.isDone();
+ }
+
+ @Override
+ public Map<ByteBuffer, ByteBuffer> get() throws
InterruptedException, ExecutionException {
+ Map<ByteBuffer, ByteBuffer> result = new
HashMap<>(workerGetFuture.get());
+ result.putAll(connectorGetFuture.get());
+ return result;
+ }
+
+ @Override
+ public Map<ByteBuffer, ByteBuffer> get(long timeout, TimeUnit
unit) throws InterruptedException, ExecutionException, TimeoutException {
+ long timeoutMs = unit.toMillis(timeout);
+ long endTime = time.milliseconds() + timeoutMs;
+ Map<ByteBuffer, ByteBuffer> result = new
HashMap<>(workerGetFuture.get(timeoutMs, unit));
+ timeoutMs = Math.max(1, endTime - time.milliseconds());
+ result.putAll(connectorGetFuture.get(timeoutMs,
TimeUnit.MILLISECONDS));
+ return result;
+ }
+ };
}
+ /**
+ * Store the specified offset key/value pairs.
+ *
+ * <p>If configured to use a connector-specific offset store, the returned
{@link Future} corresponds to a
+ * write to that store, and the passed-in {@link Callback} is invoked once
that write completes. If a worker-global
+ * store is provided, a secondary write is made to that store if the write
to the connector-specific store
+ * succeeds. Errors with this secondary write are not reflected in the
returned {@link Future} or the passed-in
+ * {@link Callback}; they are only logged as a warning to users.
+ *
+ * <p>If not configured to use a connector-specific offset store, the
returned {@link Future} corresponds to a
+ * write to the worker-global offset store, and the passed-in {@link
Callback} is invoked once that write completes.
+
+ * @param values map from key to value
+ * @param callback callback to invoke on completion of the primary write
+ * @return void future for the primary write
+ */
@Override
public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void>
callback) {
- // TODO
- return workerStore.set(values, callback);
+ final OffsetBackingStore primaryStore;
+ final OffsetBackingStore secondaryStore;
+ if (connectorStore.isPresent()) {
+ primaryStore = connectorStore.get();
+ secondaryStore = workerStore.orElse(null);
+ } else if (workerStore.isPresent()) {
+ primaryStore = workerStore.get();
+ secondaryStore = null;
+ } else {
+ // Should never happen since we check for this case in the
constructor, but just in case, this should
+ // be more informative than the NPE that would otherwise be thrown
+ throw new IllegalStateException("At least one non-null offset
store must be provided");
+ }
+
+ return primaryStore.set(values, (primaryWriteError, ignored) -> {
+ if (secondaryStore != null) {
+ if (primaryWriteError != null) {
+ log.trace("Skipping offsets write to secondary store
because primary write has failed", primaryWriteError);
Review Comment:
Why did we log `trace` to this primary store error, but `warn` to the
secondary store error in L290?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]