vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } - return primaryStore.set(values, (primaryWriteError, ignored) -> { + List<ByteBuffer> partitionsWithTombstoneOffsets = values.entrySet().stream() + .filter(offsetEntry -> offsetEntry.getValue() == null) + .map(Map.Entry::getKey).collect(Collectors.toList()); + + Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>(); + for (ByteBuffer partition : partitionsWithTombstoneOffsets) { + tombstoneOffsets.put(partition, null); + } + Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream() + .filter(offsetEntry -> offsetEntry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); Review Comment: Thanks for this. I updated the logic. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -254,7 +260,12 @@ public Map<ByteBuffer, ByteBuffer> get(long timeout, TimeUnit unit) throws Inter * write to that store, and the passed-in {@link Callback} is invoked once that write completes. If a worker-global * store is provided, a secondary write is made to that store if the write to the connector-specific store * succeeds. Errors with this secondary write are not reflected in the returned {@link Future} or the passed-in - * {@link Callback}; they are only logged as a warning to users. + * {@link Callback}; they are only logged as a warning to users. The only exception to this rule is when the offsets + * that need to be committed contains tombstone records as well. In such cases, a write consisting of only tombstone + * offsets would first happen on the worker-global store and only if it succeeds, would all the offsets be written + * to the connector-specific store and the regular offsets would be written to the worker-global store. Note that + * in this case, failure to write regular offsets to secondary store would still not reflect in the returned + * {@link Future} or the passed-in {@link Callback} Review Comment: Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org