C0urante commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1268520485
##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map
values, Callback callb
throw new IllegalStateException("At least one non-null offset
store must be provided");
}
Review Comment:
Probably time to update this now?
##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map
values, Callback callb
throw new IllegalStateException("At least one non-null offset
store must be provided");
}
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to
secondary store will
+// not be ignored. Also, for tombstone records, we first write to
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new
AtomicReference<>();
Review Comment:
We shouldn't be using an `AtomicReference` here. The reason it's used in the
[linked
snippet](https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L430-L484)
in the `ConsumerCoordinator` class is to ensure that, in the event that
multiple errors occur, we don't overwrite the first exception that we saw.
That's not a possibility here since `secondaryStoreTombstoneWriteError` is
only ever updated in separate `catch` clauses for the same `try` block, which
means that it's guaranteed to never be updated more than once.
##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map
values, Callback callb
throw new IllegalStateException("At least one non-null offset
store must be provided");
}
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to
secondary store will
+// not be ignored. Also, for tombstone records, we first write to
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the
offset commit for cases when
Review Comment:
I like the general idea here right now: block indefinitely for exactly-once,
block within the offset timeout otherwise. We also note in the
[docs](https://kafka.apache.org/documentation.html#connectconfigs_offset.flush.timeout.ms)
for the `offset.flush.timeout.ms` property that it "has no effect for source
connectors running with exactly-once support".
I don't think we need to worry about placing an upper bound on the time we
take with exactly-once support enabled. If we did, it would make tasks more
brittle (remember, we fail tasks when offset commits fail in this mode), and
preemptively writing tombstone records to the secondary offsets topic shouldn't
corrupt the offsets that a connector sees even if the current transaction
(including a write to the connector-specific offsets topic) fails. We may end
up writing garbage to the secondary offsets topic, but guarantees for
exactly-once support are lost as soon as a connector switches over to reading
exclusively from that topic, and tombstones in the secondary topic don't
overwrite non-tombstone offsets for the same partition in the primary topic.
That said, I don't love how we've made this method synchronously await the
write to the secondary store. We should return a `Future` to the caller that
corresponds to all of the offset flushes that we'd need to block on for an
offset commit (i.e., the existing flush that we're performing, possibly
preceded by a preemptive flush of tombstones to the secondary store).
##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
flushFuture.get(1000, TimeUnit.MILLISECONDS);
}
+@Test
Review Comment:
+1 for moving these tests to a `ConnectorOffsetBackingStoreTest`; the
changes to the main code are entirely contained within the