C0urante commented on PR #13801: URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280
Thanks for the in-depth analysis! I think part of the problem here stems from trying to make our internal `Future`-based API cooperate with Java's `CompleteableFuture` API. I've sketched out something below that doesn't rely on `CompletableFuture` and (I believe) preserves the semantics we want for asynchronicity: ```java public class ConnectorOffsetBackingStore implements OffsetBackingStore { @Override public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback) { final OffsetBackingStore primaryStore; final OffsetBackingStore secondaryStore; if (connectorStore.isPresent()) { primaryStore = connectorStore.get(); secondaryStore = workerStore.orElse(null); } else if (workerStore.isPresent()) { primaryStore = workerStore.get(); secondaryStore = null; } else { // Should never happen since we check for this case in the constructor, but just in case, this should // be more informative than the NPE that would otherwise be thrown throw new IllegalStateException("At least one non-null offset store must be provided"); } Map<ByteBuffer, ByteBuffer> regularOffsets = new HashMap<>(); Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>(); values.forEach((partition, offset) -> { if (offset == null) { tombstoneOffsets.put(partition, null); } else { regularOffsets.put(partition, offset); } }); if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { return secondaryStore.set(tombstoneOffsets, (tombstoneWriteError, ignored) -> { if (tombstoneWriteError != null) { log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", tombstoneWriteError); try (LoggingContext context = loggingContext()) { callback.onCompletion(tombstoneWriteError, ignored); } return; } setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); }); } else { return setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback); } } private Future<Void> setPrimaryThenSecondary( OffsetBackingStore primaryStore, OffsetBackingStore secondaryStore, Map<ByteBuffer, ByteBuffer> completeOffsets, Map<ByteBuffer, ByteBuffer> nonTombstoneOffsets, Callback<Void> callback ) { return primaryStore.set(completeOffsets, (primaryWriteError, ignored) -> { if (secondaryStore != null) { if (primaryWriteError != null) { log.trace("Skipping offsets write to secondary store because primary write has failed", primaryWriteError); } else { try { // Invoke OffsetBackingStore::set but ignore the resulting future; we don't block on writes to this // backing store. secondaryStore.set(nonTombstoneOffsets, (secondaryWriteError, ignored2) -> { try (LoggingContext context = loggingContext()) { if (secondaryWriteError != null) { log.warn("Failed to write offsets to secondary backing store", secondaryWriteError); } else { log.debug("Successfully flushed offsets to secondary backing store"); } } }); } catch (Exception e) { log.warn("Failed to write offsets to secondary backing store", e); } } } try (LoggingContext context = loggingContext()) { callback.onCompletion(primaryWriteError, ignored); } }); } } ``` It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs` fields. If this looks acceptable, I think the only question left is whether out-of-order writes are possible because of how things are chained. I believe this is only a problem for non-exactly-once source tasks (since we only have at most one in-flight offset commit at a time when exactly-once support is enabled), and should be handled gracefully by `OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of eyes to make sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org