vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,15 +290,54 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer>
values, Callback<Void> callb
throw new IllegalStateException("At least one non-null offset
store must be provided");
}
- return primaryStore.set(values, (primaryWriteError, ignored) -> {
+ List<ByteBuffer> partitionsWithTombstoneOffsets =
values.entrySet().stream()
+ .filter(offsetEntry -> offsetEntry.getValue() == null)
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+
+ Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
+ for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+ tombstoneOffsets.put(partition, null);
+ }
+ Map<ByteBuffer, ByteBuffer> regularOffsets = values.entrySet().stream()
+ .filter(offsetEntry -> offsetEntry.getValue() != null)
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
Review Comment:
Thanks for this. I updated the logic.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -254,7 +260,12 @@ public Map<ByteBuffer, ByteBuffer> get(long timeout,
TimeUnit unit) throws Inter
* write to that store, and the passed-in {@link Callback} is invoked once
that write completes. If a worker-global
* store is provided, a secondary write is made to that store if the write
to the connector-specific store
* succeeds. Errors with this secondary write are not reflected in the
returned {@link Future} or the passed-in
- * {@link Callback}; they are only logged as a warning to users.
+ * {@link Callback}; they are only logged as a warning to users. The only
exception to this rule is when the offsets
+ * that need to be committed contains tombstone records as well. In such
cases, a write consisting of only tombstone
+ * offsets would first happen on the worker-global store and only if it
succeeds, would all the offsets be written
+ * to the connector-specific store and the regular offsets would be
written to the worker-global store. Note that
+ * in this case, failure to write regular offsets to secondary store would
still not reflect in the returned
+ * {@link Future} or the passed-in {@link Callback}
Review Comment:
Added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]