C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1417677509


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.

Review Comment:
   This largely duplicates information present in the Javadoc for the method. 
IMO we can remove this entire comment block and, if we really want to make it 
easy for maintainers to understand not just the "what" but the "why" for this 
logic, we can link to https://issues.apache.org/jira/browse/KAFKA-15018 in the 
Javadoc for the method:
   
   ```java
        *
        * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-15018";>KAFKA-15018</a> for 
context on the three-step
        * write sequence
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+                try {
+                    if (exactlyOnce) {
+                        secondaryWriteFuture.get();
+                    } else {
+                        secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+                    }
+                } catch (ExecutionException e) {
+                    log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());

Review Comment:
   Nit:
   ```suggestion
                       log.error("{} Failed to flush tombstone offsets to 
secondary store", this, e.getCause());
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -299,12 +349,11 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
                     } catch (Exception e) {
                         log.warn("Failed to write offsets to secondary backing 
store", e);
                     }
+                    callback.onCompletion(null, null);
                 }
             }
-            try (LoggingContext context = loggingContext()) {
-                callback.onCompletion(primaryWriteError, ignored);
-            }
-        });

Review Comment:
   Why is this part moved? It seems fine where it was, unless we're worried 
about dangling callbacks caused by potential exceptions being thrown above. But 
it also doesn't look like the new implementation is any better at preventing 
that from happening.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -254,7 +260,12 @@ public Map<ByteBuffer, ByteBuffer> get(long timeout, 
TimeUnit unit) throws Inter
      * write to that store, and the passed-in {@link Callback} is invoked once 
that write completes. If a worker-global
      * store is provided, a secondary write is made to that store if the write 
to the connector-specific store
      * succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
-     * {@link Callback}; they are only logged as a warning to users.
+     * {@link Callback}; they are only logged as a warning to users. The only 
exception to this rule is when the offsets
+     * that need to be committed contains tombstone records as well. In such 
cases, a write consisting of only tombstone
+     * offsets would first happen on the worker-global store and only if it 
succeeds, would all the offsets be written
+     * to the connector-specific store and the regular offsets would be 
written to the worker-global store. Note that
+     * in this case, failure to write regular offsets to secondary store would 
still not reflect in the returned
+     * {@link Future} or the passed-in {@link Callback}

Review Comment:
   Some suggestions on making this clearer and easier to read (begins at line 
262, which I can't include in a suggestion block in GitHub):
   ```suggestion
        * succeeds.
        * <p>
        * Normally, errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
        * {@link Callback}; they are only logged as a warning to users. The 
only exception to this rule is when the
        * offsets that need to be committed contain tombstone records.
        * <p>When the to-be-committed offsets contain tombstones, offset 
commits take place in three phases:
        * <ol>
        *     <li>First, only the tombstone offsets are written to the 
worker-global store. Failures during this step will
        *     be reflected in the returned {@link Future} and reported to the 
passed-in {@link Callback}.</li>
        *     <li>If and only if the previous write to the worker-global store 
succeeded, all offsets (both tombstones and
        *     non-tombstones) are written to the connector-specific store. 
Failures during this step will also be
        *     reflected in the returned {@link Future} and reported to the 
passed-in {@link Callback}.</li>
        *     <li>Finally, if and only if the previous write to the 
connector-specific store succeeded, all offsets with
        *     non-tombstone values are written to the worker-global store. 
Failures during this step will only be reported
        *     as warning log messages, and will not be reflected in the 
returned {@link Future} or reported to the
        *     passed-in {@link Callback}.</li>
        * </ol>
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));

Review Comment:
   We can simplify this:
   
   ```java
           Map<ByteBuffer, ByteBuffer> regularOffsets = new HashMap<>();
           Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
           values.forEach((partition, offset) -> {
               if (offset == null)
                   tombstoneOffsets.put(partition, null);
               else
                   regularOffsets.put(partition, offset);
           });
   ```
   
   (Will also require removing some imports that become unnecessary in order to 
satisfy Checkstyle)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+                try {
+                    if (exactlyOnce) {
+                        secondaryWriteFuture.get();
+                    } else {
+                        secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+                    }
+                } catch (ExecutionException e) {
+                    log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+                    callback.onCompletion(e.getCause(), null);
+                } catch (Exception e) {

Review Comment:
   We should probably catch `Throwable` here so as not to leave the passed-in 
`callback` dangling, right?
   ```suggestion
                   } catch (Throwable t) {
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+                try {
+                    if (exactlyOnce) {
+                        secondaryWriteFuture.get();
+                    } else {
+                        secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+                    }

Review Comment:
   Probably couldn't hurt to add a log message for the positive case here:
   ```suggestion
                       }
                       log.debug("Successfully flushed tombstone offsets to 
secondary store");
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+                try {
+                    if (exactlyOnce) {
+                        secondaryWriteFuture.get();
+                    } else {
+                        secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+                    }
+                } catch (ExecutionException e) {
+                    log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+                    callback.onCompletion(e.getCause(), null);
+                } catch (Exception e) {
+                    log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);

Review Comment:
   Nit:
   ```suggestion
                       log.error("{} Failed to flush tombstone offsets to 
secondary store", this, e);
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());

Review Comment:
   We don't need to construct a new `FutureCallback` here, we can just inline a 
no-op `Callback`:
   ```suggestion
                   Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, (t, r) -> { });
   ```
   
   Also fine to add a utility method to the `Callback` interface if we feel 
like being fancy:
   ```java
   public interface Callback<V> {
       static <T> Callback<T> noOp() {
           return (error, result) -> { };
       }
   }
   ```
   
   and then reference it here with:
   ```suggestion
                   Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, Callback.noOp());
   
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+                try {
+                    if (exactlyOnce) {
+                        secondaryWriteFuture.get();
+                    } else {
+                        secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+                    }
+                } catch (ExecutionException e) {
+                    log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+                    callback.onCompletion(e.getCause(), null);
+                } catch (Exception e) {
+                    log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+                    callback.onCompletion(e, null);
+                }
+            });
+        }
+        offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new 
FutureCallback<>((primaryWriteError, ignored) -> {

Review Comment:
   We also need to update `offsetWriteFuture` to the return value of 
`CompletableFuture::thenAccept` (or `completableFuture::thenRun`) here:
   ```suggestion
           offsetWriteFuture = offsetWriteFuture.thenRun(() -> 
primaryStore.set(values, new FutureCallback<>((primaryWriteError, ignored) -> {
   ```
   
   We should probably also add tests that ensure that the future returned from 
`ConnectorOffsetBackingStore::set` doesn't return until all necessary writes to 
the primary and secondary stores have completed (with the exception of 
non-tombstone writes to the secondary store).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenAccept(v -> {

Review Comment:
   This causes the passed-in lambda to be executed after `offsetWriteFuture` 
completes, but it doesn't cause `offsetWriteFuture::get` to block on the 
completion of that lambda, which means that if the caller of 
`ConnectorOffsetBackingStore::set` waits for the returned `Future` to complete 
right now, it'll complete immediately even though it's likely no offsets will 
have been written to either the primary or secondary store.
   
   Also, nit: when researching this behavior, I ran into 
`CompletableFuture::thenRun`, which is a better fit than 
`CompletableFuture::thenAccept` (since we don't use the return value).
   
   ```suggestion
               offsetWriteFuture = offsetWriteFuture.thenRun(() -> {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to