vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1287096186


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,52 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+        // If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+        // written to the secondary store in a synchronous manner and then to 
the primary store.
+        // This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+        // but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+        // source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+        // if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+        // offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+        CompletableFuture<Void> offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+        if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+            offsetWriteFuture.thenApply(v -> {
+                Future<Void> secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+                try {
+                    if (exactlyOnce) {
+                        secondaryWriteFuture.get();
+                    } else {
+                        secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+                    }
+                } catch (ExecutionException e) {
+                    log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+                } catch (Exception e) {
+                    log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+                }

Review Comment:
   That's a great catch. I need to catch it to keep the compiler happy(get() 
throws a couple of exceptions). Actually I realised my tests were doing the 
asserts incorrectly. I have made further changes of the form =>
   
   1. Replace `thenApply` with `thenAccept` when chaining CompletableFutures. 
This makes sense since we don't really need to do any transformations but just 
perform an action (setting to primary/secondary store).
   2. I now return the CompletableFuture object directly and invoke the passed 
callback's onComplete at appropriate places. Also note that for error cases, it 
doesn't seem to be possible to throw checked exceptions from any of these 
chaining methods like `thenAccept` etc. We need to wrap them in a 
RuntimeException and handle them via `exceptionally` or `whenComplete`. I 
haven't chosen either of the 2 approaches since eventually the errors need to 
reflect on the callback.
   3. Made changes to the tests where now I assert errors and results directly 
within the callback. I believe this is closer to the actual changes.
   
   Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to