[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-08-07 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1286347860


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,52 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenApply(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+}

Review Comment:
   Why are we catching these exceptions at all? Shouldn't we be throwing them 
so that the offset commit fails?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-26 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1275098764


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test

Review Comment:
   Sorry, the point isn't just that we should have a new test suite with the 
same exact tests in it, it's that we should be testing the semantics of the 
`ConnectorOffsetBackingStore` class directly. We shouldn't be creating 
`OffsetStorageWriter` instances in these tests, we should be invoking, e.g., 
`ConnectorOffsetBackingStore::set` directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-19 Thread via GitHub


C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1268520485


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 

Review Comment:
   Probably time to update this now?



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   We shouldn't be using an `AtomicReference` here. The reason it's used in the 
[linked 
snippet](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484)
 in the `ConsumerCoordinator` class is to ensure that, in the event that 
multiple errors occur, we don't overwrite the first exception that we saw.
   
   That's not a possibility here since `secondaryStoreTombstoneWriteError` is 
only ever updated in separate `catch` clauses for the same `try` block, which 
means that it's guaranteed to never be updated more than once.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   I like the general idea here right now: block indefinitely for exactly-once, 
block within the offset timeout otherwise. We also note in the 
[docs](https://kafka.apache.org/documentation.html#connectconfigs_offset.flush.timeout.ms)
 for the `offset.flush.timeout.ms` property that it "has no effect for source 
connectors running with exactly-once support".
   
   I don't think we need to worry about placing an upper bound on the time we 
take with exactly-once support enabled. If we did, it would make tasks more 
brittle (remember, we fail tasks when offset commits fail in this mode), and 
preemptively writing tombstone records to the secondary offsets topic shouldn't 
corrupt the offsets that a connector sees even if the current transaction 
(including a write to the connector-specific offsets topic) fails. We may end 
up writing garbage to the secondary offsets topic, but guarantees for 
exactly-once support are lost as soon as a connector switches over to reading 
exclusively from that topic, and tombstones in the secondary topic don't 
overwrite non-tombstone offsets for the same partition in the primary topic.
   
   That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a `Future` to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test

Review Comment:
   +1 for moving these tests to a `ConnectorOffsetBackingStoreTest`; the 
changes to the main code are entirely contained within the