vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
-        return primaryStore.set(values, (primaryWriteError, ignored) -> {
+        List<ByteBuffer> partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() == null)
+                .map(Map.Entry::getKey).collect(Collectors.toList());
+
+        Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+        for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+            tombstoneOffsets.put(partition, null);
+        }
+        Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+                .filter(offsetEntry -> offsetEntry.getValue() != null)
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));

Review Comment:
   Thanks for this. I updated the logic.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -254,7 +260,12 @@ public Map<ByteBuffer, ByteBuffer> get(long timeout, 
TimeUnit unit) throws Inter
      * write to that store, and the passed-in {@link Callback} is invoked once 
that write completes. If a worker-global
      * store is provided, a secondary write is made to that store if the write 
to the connector-specific store
      * succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
-     * {@link Callback}; they are only logged as a warning to users.
+     * {@link Callback}; they are only logged as a warning to users. The only 
exception to this rule is when the offsets
+     * that need to be committed contains tombstone records as well. In such 
cases, a write consisting of only tombstone
+     * offsets would first happen on the worker-global store and only if it 
succeeds, would all the offsets be written
+     * to the connector-specific store and the regular offsets would be 
written to the worker-global store. Note that
+     * in this case, failure to write regular offsets to secondary store would 
still not reflect in the returned
+     * {@link Future} or the passed-in {@link Callback}

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to