[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-02 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1214236252


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,30 +280,47 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
 if (secondaryStore != null) {
 if (primaryWriteError != null) {
 log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
+try (LoggingContext context = loggingContext()) {
+callback.onCompletion(primaryWriteError, ignored);
+}
 } else {
 try {
 // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
-// backing store.
+// backing store. The only exception to this is when a 
batch consisting of tombstone records fails
+// to be written to secondary store and has been 
successfully written to the primary store. In this case
+// an error would be propagated back as in such cases, 
a deleted source partition
+// would be reported as present because the 2 stores 
are not in sync.
 secondaryStore.set(values, (secondaryWriteError, 
ignored2) -> {
 try (LoggingContext context = loggingContext()) {
-if (secondaryWriteError != null) {
+if (secondaryWriteError != null && 
containsTombstones) {
+log.warn("Failed to write offsets with 
tombstone records to secondary backing store", secondaryWriteError);
+callback.onCompletion(secondaryWriteError, 
ignored);
+return;
+} else if (secondaryWriteError != null) {
 log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
 } else {
 log.debug("Successfully flushed offsets to 
secondary backing store");
 }
+//primaryWriteError is null at this point, and 
we don't care about secondaryWriteError
+callback.onCompletion(null, ignored);

Review Comment:
   Isn't this going to cause us to always block on writes to the secondary 
store which is something we want to avoid?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-06 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1219596953


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 

Review Comment:
   This `ConnectorOffsetBackingStore::set` method's Javadoc also needs to be 
updated to mention the special case handling for batches with `null` offsets 
since it currently states the following:
   
   ```
* If configured to use a connector-specific offset store, the 
returned {@link Future} corresponds to a
* write to that store, and the passed-in {@link Callback} is invoked 
once that write completes. If a worker-global
* store is provided, a secondary write is made to that store if the 
write to the connector-specific store
* succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
* {@link Callback}; they are only logged as a warning to users.
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);

Review Comment:
   nit: can be simplified
   ```suggestion
   boolean containsTombstones = values.containsValue(null);
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +280,33 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.entrySet()
+.stream()
+.anyMatch(offset -> offset.getValue() == null);
+
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+secondaryStore.set(values, (secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+}
+
 return primaryStore.set(values, (primaryWriteError, ignored) -> {
-if (secondaryStore != null) {
+// Secondary store writes have already happened for tombstone 
records

Review Comment:
   How do we know this if we aren't blocking on the write to the secondary 
store above? I believe we should do a synchronous write to the secondary store 
in this tombstone offset case.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -302,7 +326,12 @@ public Future set(Map 
values, Callback callb
 }
 }
 try (LoggingContext context = loggingContext()) {
-callback.onCompletion(primaryWriteError, ignored);
+Throwable secondaryWriteError = 
secondaryStoreTombstoneWriteError.get();
+if (secondaryStore != null && containsTombstones && 
secondaryWriteError != null) {

Review Comment:
   Same as above - we aren't blocking on the write to the secondary store, so 
we can't be sure that it has completed at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-06 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1219648522


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +197,220 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test

Review Comment:
   All these new tests seem to be specific to the `ConnectorOffsetBackingStore` 
class - should we move them to a new `ConnectorOffsetBackingStoreTest` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-19 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1233861544


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -140,6 +142,9 @@ public static ConnectorOffsetBackingStore 
withOnlyConnectorStore(
 private final Optional connectorStore;
 private final Optional connectorStoreAdmin;
 
+private boolean isEOSEnabled;

Review Comment:
   ```suggestion
   private boolean exactlyOnce;
   ```
   nit: similar to 
https://github.com/apache/kafka/blob/9b7f7e0fa09121e204540b46bd3d7340a0175e7d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L152



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});

Review Comment:
   Why do we need to do this if we're anyway doing a get on the returned future 
immediately? Any producer error will be wrapped in an `ExecutionException` and 
thrown when we call get on the future returned from `KafkaOffsetBackingStore` 
(the same exception will also be used to complete the passed callback 
exceptionally) - 
https://github.com/apache/kafka/blob/546b912b831d46208499c6e1f4f785db6ddd9aa3/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L344-L414



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case of ALOS, we wait for the same 
duration as `offset.commit.timeout.ms`

Review Comment:
   nit: I haven't seen the term ALOS used anywhere else in Connect. I'm 
presuming it means at least once delivery semantics? The Connect framework 
doesn't guarantee at least once semantics for all source connectors - depending 
on the specific connector plugin implementation, we could also have at most 
onc

[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-06-23 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1239720643


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws 
InterruptedException, TimeoutException, ExecutionException {

Review Comment:
   ```suggestion
   public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws 
Exception {
   ```
   nit: since this is just a test method, we can avoid the verbosity



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,59 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed 
tombstone(s)-containing to secondary backing store");
+}
+}
+});
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing to secondary 
store interrupted, cancelling", this);

Review Comment:
   `Flush of tombstone(s)-containing to secondary store`
   
   Should this be `Flush of tombstone(s)-containing offsets batch to secondary 
store` instead? 😅 
   
   Also I'm wondering if all of these log lines need to necessarily contain 
this verbiage around tombstone containing offsets? Users probably don't really 
need to know (or care) that offset flushes are handled differently internally 
depending on whether or not they contain one or more `null` valued offsets. 
IMO, just including the details of the failure should be sufficient - WDYT?



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,59 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteE

[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-07 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1255451696


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,10 +284,61 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+Future secondaryWriteFuture = secondaryStore.set(values, 
(secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed tombstone offsets to 
secondary backing store");
+}
+}
+});

Review Comment:
   I was suggesting the use of a plain `FutureCallback` instantiated via the 
default constructor. We're still creating an unnecessary callback to handle 
`secondaryWriteError` which will anyway be surfaced synchronously when we call 
`get` on our `FutureCallback`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-07 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1255456909


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,60 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new 
FutureCallback<>((secondaryWriteError, ignored) -> {

Review Comment:
   Why do we need this underlying callback to handle `secondaryWriteError`? 
Can't we just use a plain `FutureCallback` instantiated via the default 
constructor ? The error will anyway be surfaced synchronously when we call 
`get` on our `FutureCallback` (and handled in the `ExecutionException` catch 
block)?
   
   (Also mentioned in 
https://github.com/apache/kafka/pull/13801#discussion_r1255451696, but that 
comment was resolved and I don't seem to have permissions to un-resolve it).



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,60 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new 
FutureCallback<>((secondaryWriteError, ignored) -> {
+try (LoggingContext context = loggingContext()) {
+if (secondaryWriteError != null) {
+log.warn("Failed to write offsets with tombstone 
records to secondary backing store", secondaryWriteError);
+secondaryStoreTombstoneWriteError.compareAndSet(null, 
secondaryWriteError);
+} else {
+log.debug("Successfully flushed 
tombstone(s)-containing to secondary backing store");

Review Comment:
   Same comment as https://github.com/apache/kafka/pull/13801/files#r1239708472 
(also looks like that wasn't resolved either, even though the comment was 
resolved).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1258189389


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.

Review Comment:
   Can we also add a sentence or two explaining the reasoning here (wording can 
be borrowed from https://issues.apache.org/jira/browse/KAFKA-15018)? 



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   This doesn't need to be an `AtomicReference` anymore



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing offsets to 
secondary store interrupted, cancelling", this);

Review Comment:
   Can we unify the language used in the log lines here and the subsequent 
catch blocks? `tombstone(s)-containing offsets`  / `offsets with tombstones` 
and `secondary store` / `secondary storage`. Also, since most users won't be 
aware of what "primary" and "secondary" stores are, I'd prefer using something 
like worker / global offsets topic instead. As per 
https://kafka.apache.org/coding-guide.html, logging is part of our "UI" 😄 



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -192,6 +212,251 @@ public void testCancelAfterAwaitFlush() throws Exception {
 flushFuture.get(1000, TimeUnit.MILLISECONDS);
 }
 
+@Test
+public void 
testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws 
Exception {
+
+KafkaOffsetBackingStore connectorStore = 
setupOffsetBackingStoreWithProducer("topic1", false);
+KafkaOffsetBackingStore workerStore = 
setupOffsetBackingStoreWithProducer("topic2", true);
+
+extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null);
+
+ConnectorOffsetBackingStore offsetBackingStore = 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+() -> LoggingContext.forConnector("source-connector"),
+ 

[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259160336


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   Aren't we still making explicit null checks here - 
https://github.com/apache/kafka/pull/13801/files#diff-0014cfa4c32bfde215cb10bee987e997d5182815fcf1a1245539375519f83f3dR325-R326?
 🤔 
   
   I'm fine with retaining it though, this isn't a blocking comment.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();

Review Comment:
   Aren't we still making explicit null checks here - 
https://github.com/apache/kafka/pull/13801/files#diff-0014cfa4c32bfde215cb10bee987e997d5182815fcf1a1245539375519f83f3dR325-R326?
 🤔 
   
   I'm fine with retaining it though, this isn't a blocking comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259161121


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing offsets to 
secondary store interrupted, cancelling", this);

Review Comment:
   Thanks, that makes sense 👍 



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when
+// tombstone records fail to be written to the secondary 
store. Note that while commitTransaction
+// already waits for all records to be sent and ack'ed, in 
this case we do need to add an explicit
+// blocking call. In case EOS is disabled, we wait for the 
same duration as `offset.commit.timeout.ms`
+// and throw that exception which would allow the offset 
commit to fail.
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (InterruptedException e) {
+log.warn("{} Flush of tombstone(s)-containing offsets to 
secondary store interrupted, cancelling", this);

Review Comment:
   Thanks, that makes sense 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-10 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259168414


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   > Well the rationale it the same that EOS waits indefinitely for the commits 
to happen
   
   Prior to this change, we just waited for the producer transaction commit to 
complete since we didn't care about the writes to the secondary store (and the 
transaction would contain all the data records as well as offset records 
written to the connector's primary store). However, that would be bounded by 
the producer's `transaction.timeout.ms` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13801: KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records

2023-07-11 Thread via GitHub


yashmayya commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259168414


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,8 +284,51 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
+boolean containsTombstones = values.containsValue(null);
+
+// If there are tombstone offsets, then the failure to write to 
secondary store will
+// not be ignored. Also, for tombstone records, we first write to 
secondary store and
+// then to primary stores.
+if (secondaryStore != null && containsTombstones) {
+AtomicReference secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+FutureCallback secondaryWriteFuture = new FutureCallback<>();
+secondaryStore.set(values, secondaryWriteFuture);
+try {
+// For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+// commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   > Well the rationale it the same that EOS waits indefinitely for the commits 
to happen
   
   Prior to this change, we just waited for the producer transaction commit to 
complete since we didn't care about the writes to the secondary offsets store 
(and the transaction would contain all the data records as well as offset 
records written to the connector's primary offsets store). However, that would 
be bounded by the producer's `transaction.timeout.ms` right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org