vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer>
values, Callback<Void> callb
throw new IllegalStateException("At least one non-null offset
store must be provided");
}
- return primaryStore.set(values, (primaryWriteError, ignored) -> {
+ List<ByteBuffer> partitionsWithTombstoneOffsets =
values.entrySet().stream()
+ .filter(offsetEntry -> offsetEntry.getValue() == null)
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+
+ Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+ for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+ tombstoneOffsets.put(partition, null);
+ }
+ Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+ .filter(offsetEntry -> offsetEntry.getValue() != null)
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ // If the supplied offsets contain tombstone values, then tombstone
offsets are extracted first,
+ // written to the secondary store in a synchronous manner and then to
the primary store.
+ // This is because, if a tombstone offset is successfully written to
the per-connector offsets topic,
+ // but cannot be written to the global offsets topic, then the global
offsets topic will still contain that
+ // source offset, but the per-connector topic will not. Due to the
fallback-on-global logic used by the worker,
+ // if a task requests offsets for one of the tombstoned partitions,
the worker will provide it with the
+ // offsets present in the global offsets topic, instead of indicating
to the task that no offsets can be found.
+ CompletableFuture<Void> offsetWriteFuture =
CompletableFuture.completedFuture(null);
+ if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+ offsetWriteFuture.thenAccept(v -> {
Review Comment:
I have actually removed the chaining using `CompletableFuture` and
simplified the logic. I just wait on the secondary store write directly (with
or without a timeout) and if the execution fails or the wait itself fails, I
update the callback and return the same exception (because it's already
completed).
```
if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
Future<Void> secondaryWriteFuture =
secondaryStore.set(tombstoneOffsets, (t, r) -> { });
try {
if (exactlyOnce) {
secondaryWriteFuture.get();
} else {
secondaryWriteFuture.get(offsetFlushTimeoutMs,
TimeUnit.MILLISECONDS);
}
log.debug("Successfully flushed tombstone offsets to
secondary store");
} catch (ExecutionException e) {
log.error("{} Failed to flush tombstone offsets to secondary
store", this, e.getCause());
callback.onCompletion(e.getCause(), null);
return secondaryWriteFuture;
} catch (Throwable e) {
log.error("{} Failed to flush tombstone offsets to secondary
store", this, e);
callback.onCompletion(e, null);
return secondaryWriteFuture;
}
}
```
With this, there are no changes (apart from using `regularOffsets` when
writing to secondary store) from the primary store. Let me know what you think
about this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]