C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280

   Thanks for the in-depth analysis! I think part of the problem here stems 
from trying to make our internal `Future`-based API cooperate with Java's 
`CompleteableFuture` API. I've sketched out something below that doesn't rely 
on `CompletableFuture` and (I believe) preserves the semantics we want for 
asynchronicity:
   
   ```java
   public class ConnectorOffsetBackingStore implements OffsetBackingStore {
   
       @Override
       public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, 
Callback<Void> callback) {
           final OffsetBackingStore primaryStore;
           final OffsetBackingStore secondaryStore;
           if (connectorStore.isPresent()) {
               primaryStore = connectorStore.get();
               secondaryStore = workerStore.orElse(null);
           } else if (workerStore.isPresent()) {
               primaryStore = workerStore.get();
               secondaryStore = null;
           } else {
               // Should never happen since we check for this case in the 
constructor, but just in case, this should
               // be more informative than the NPE that would otherwise be 
thrown
               throw new IllegalStateException("At least one non-null offset 
store must be provided");
           }
   
           Map<ByteBuffer, ByteBuffer> regularOffsets = new HashMap<>();
           Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
           values.forEach((partition, offset) -> {
               if (offset == null) {
                   tombstoneOffsets.put(partition, null);
               } else {
                   regularOffsets.put(partition, offset);
               }
           });
   
           if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
               return secondaryStore.set(tombstoneOffsets, 
(tombstoneWriteError, ignored) -> {
                   if (tombstoneWriteError != null) {
                       log.trace("Skipping offsets write to primary store 
because secondary tombstone write has failed", tombstoneWriteError);
                       try (LoggingContext context = loggingContext()) {
                           callback.onCompletion(tombstoneWriteError, ignored);
                       }
                       return;
                   }
   
                   setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback);
               });
           } else {
               return setPrimaryThenSecondary(primaryStore, secondaryStore, 
values, regularOffsets, callback);
           }
       }
   
       private Future<Void> setPrimaryThenSecondary(
               OffsetBackingStore primaryStore,
               OffsetBackingStore secondaryStore,
               Map<ByteBuffer, ByteBuffer> completeOffsets,
               Map<ByteBuffer, ByteBuffer> nonTombstoneOffsets,
               Callback<Void> callback
       ) {
           return primaryStore.set(completeOffsets, (primaryWriteError, 
ignored) -> {
               if (secondaryStore != null) {
                   if (primaryWriteError != null) {
                       log.trace("Skipping offsets write to secondary store 
because primary write has failed", primaryWriteError);
                   } else {
                       try {
                           // Invoke OffsetBackingStore::set but ignore the 
resulting future; we don't block on writes to this
                           // backing store.
                           secondaryStore.set(nonTombstoneOffsets, 
(secondaryWriteError, ignored2) -> {
                               try (LoggingContext context = loggingContext()) {
                                   if (secondaryWriteError != null) {
                                       log.warn("Failed to write offsets to 
secondary backing store", secondaryWriteError);
                                   } else {
                                       log.debug("Successfully flushed offsets 
to secondary backing store");
                                   }
                               }
                           });
                       } catch (Exception e) {
                           log.warn("Failed to write offsets to secondary 
backing store", e);
                       }
                   }
               }
               try (LoggingContext context = loggingContext()) {
                   callback.onCompletion(primaryWriteError, ignored);
               }
           });
       }
   }
   ```
   
   It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs` 
fields.
   
   If this looks acceptable, I think the only question left is whether 
out-of-order writes are possible because of how things are chained. I believe 
this is only a problem for non-exactly-once source tasks (since we only have at 
most one in-flight offset commit at a time when exactly-once support is 
enabled), and should be handled gracefully by 
`OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of 
eyes to make sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to