vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } - return primaryStore.set(values, (primaryWriteError, ignored) -> { + List<ByteBuffer> partitionsWithTombstoneOffsets = values.entrySet().stream() + .filter(offsetEntry -> offsetEntry.getValue() == null) + .map(Map.Entry::getKey).collect(Collectors.toList()); + + Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>(); + for (ByteBuffer partition : partitionsWithTombstoneOffsets) { + tombstoneOffsets.put(partition, null); + } + Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream() + .filter(offsetEntry -> offsetEntry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, + // written to the secondary store in a synchronous manner and then to the primary store. + // This is because, if a tombstone offset is successfully written to the per-connector offsets topic, + // but cannot be written to the global offsets topic, then the global offsets topic will still contain that + // source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, + // if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the + // offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. + CompletableFuture<Void> offsetWriteFuture = CompletableFuture.completedFuture(null); + if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { + offsetWriteFuture.thenAccept(v -> { Review Comment: ~~I have actually removed the chaining using `CompletableFuture` and simplified the logic. I just wait on thesecondary store write directly (with or without a timeout) and if the execution fails or the wait itself fails, I update the callback and return the same exception (because it's already completed).With this, there are no changes (apart from using `regularOffsets` when writing to secondary store) from the primary store. Let me know what you think about this.~~ I re-read the comments and looks like with this we are going against the approach you wanted in this https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } - return primaryStore.set(values, (primaryWriteError, ignored) -> { + List<ByteBuffer> partitionsWithTombstoneOffsets = values.entrySet().stream() + .filter(offsetEntry -> offsetEntry.getValue() == null) + .map(Map.Entry::getKey).collect(Collectors.toList()); + + Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>(); + for (ByteBuffer partition : partitionsWithTombstoneOffsets) { + tombstoneOffsets.put(partition, null); + } + Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream() + .filter(offsetEntry -> offsetEntry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // If the supplied offsets contain tombstone values, then tombstone offsets are extracted first, + // written to the secondary store in a synchronous manner and then to the primary store. + // This is because, if a tombstone offset is successfully written to the per-connector offsets topic, + // but cannot be written to the global offsets topic, then the global offsets topic will still contain that + // source offset, but the per-connector topic will not. Due to the fallback-on-global logic used by the worker, + // if a task requests offsets for one of the tombstoned partitions, the worker will provide it with the + // offsets present in the global offsets topic, instead of indicating to the task that no offsets can be found. + CompletableFuture<Void> offsetWriteFuture = CompletableFuture.completedFuture(null); + if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { + offsetWriteFuture.thenAccept(v -> { Review Comment: I re-read the comments and looks like with this we are going against the approach you wanted in this [comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) i.e > That said, I don't love how we've made this method synchronously await the write to the secondary store. We should return a Future to the caller that corresponds to all of the offset flushes that we'd need to block on for an offset commit (i.e., the existing flush that we're performing, possibly preceded by a preemptive flush of tombstones to the secondary store). Let me take a look at this again. Sorry about this since I had forgotten about this comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org