C0urante commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r901682079


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -63,28 +70,109 @@
 public class KafkaOffsetBackingStore implements OffsetBackingStore {
     private static final Logger log = 
LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
 
-    private KafkaBasedLog<byte[], byte[]> offsetLog;
-    private HashMap<ByteBuffer, ByteBuffer> data;
+    /**
+     * Build a connector-specific offset store with read and write support. 
The producer will be {@link Producer#close(Duration) closed}
+     * and the consumer will be {@link Consumer#close(Duration) closed} when 
this store is {@link #stop() stopped}, but the topic admin
+     * must be {@link TopicAdmin#close(Duration) closed} by the caller.
+     * @param topic the name of the offsets topic to use
+     * @param producer the producer to use for writing to the offsets topic
+     * @param consumer the consumer to use for reading from the offsets topic
+     * @param topicAdmin the topic admin to use for creating and querying 
metadata for the offsets topic
+     * @return an offset store backed by the given topic and Kafka clients
+     */
+    public static KafkaOffsetBackingStore forTask(
+            String topic,
+            Producer<byte[], byte[]> producer,
+            Consumer<byte[], byte[]> consumer,
+            TopicAdmin topicAdmin
+    ) {
+        return new KafkaOffsetBackingStore(() -> topicAdmin) {
+            @Override
+            public void configure(final WorkerConfig config) {
+                exactlyOnce = config.exactlyOnceSourceEnabled();
+                offsetLog = KafkaBasedLog.withExistingClients(
+                        topic,
+                        consumer,
+                        producer,
+                        topicAdmin,
+                        consumedCallback,
+                        Time.SYSTEM,
+                        initialize(topic, newTopicDescription(topic, config))
+                );
+            }
+        };
+    }
+
+    /**
+     * Build a connector-specific offset store with read-only support. The 
consumer will be {@link Consumer#close(Duration) closed}
+     * when this store is {@link #stop() stopped}, but the topic admin must be 
{@link TopicAdmin#close(Duration) closed} by the caller.
+     * @param topic the name of the offsets topic to use
+     * @param consumer the consumer to use for reading from the offsets topic
+     * @param topicAdmin the topic admin to use for creating and querying 
metadata for the offsets topic
+     * @return a read-only offset store backed by the given topic and Kafka 
clients
+     */
+    public static KafkaOffsetBackingStore forConnector(
+            String topic,
+            Consumer<byte[], byte[]> consumer,
+            TopicAdmin topicAdmin
+    ) {
+        return new KafkaOffsetBackingStore(() -> topicAdmin) {
+            @Override
+            public void configure(final WorkerConfig config) {
+                exactlyOnce = config.exactlyOnceSourceEnabled();
+                offsetLog = KafkaBasedLog.withExistingClients(
+                        topic,
+                        consumer,
+                        null,
+                        topicAdmin,
+                        consumedCallback,
+                        Time.SYSTEM,
+                        initialize(topic, newTopicDescription(topic, config))
+                );
+            }
+        };
+    }
+
+    protected KafkaBasedLog<byte[], byte[]> offsetLog;
+    private final HashMap<ByteBuffer, ByteBuffer> data = new HashMap<>();
     private final Supplier<TopicAdmin> topicAdminSupplier;
     private SharedTopicAdmin ownTopicAdmin;
+    protected boolean exactlyOnce;
 
+    /**
+     * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will cause the
+     * store to instantiate and close its own {@link TopicAdmin} during {@link 
#configure(WorkerConfig)}
+     * and {@link #stop()}, respectively.
+     *
+     * @deprecated use {@link #KafkaOffsetBackingStore(Supplier)} instead
+     */
     @Deprecated
     public KafkaOffsetBackingStore() {
         this.topicAdminSupplier = null;
     }
 
+    /**
+     * Create an {@link OffsetBackingStore} backed by a Kafka topic. This 
constructor will use the given
+     * {@link Supplier} to acquire a {@link TopicAdmin} that will be used for 
interactions with the backing
+     * Kafka topic. The caller is expected to manage the lifecycle of that 
object, including
+     * {@link TopicAdmin#close(Duration) closing} it when it is no longer 
needed.
+     * @param topicAdmin a {@link Supplier} for the {@link TopicAdmin} to use 
for this backing store;
+     *                   may not be null, and may not return null
+     */
     public KafkaOffsetBackingStore(Supplier<TopicAdmin> topicAdmin) {
         this.topicAdminSupplier = Objects.requireNonNull(topicAdmin);
     }
 
+
     @Override
     public void configure(final WorkerConfig config) {
         String topic = 
config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
         if (topic == null || topic.trim().length() == 0)
             throw new ConfigException("Offset storage topic must be 
specified");
 
+        exactlyOnce = config.exactlyOnceSourceEnabled();

Review Comment:
   Used 
[here](https://github.com/C0urante/kafka/blob/ef5a43b70bc1172636cce43de0fcf493825da24d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L258-L267).
   
   Added `this.` on writes to this field and others in the `configure` method 
to make it easier to identify instance vs. function variables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to