C0urante commented on PR #13801:
URL: https://github.com/apache/kafka/pull/13801#issuecomment-1919475280
Thanks for the in-depth analysis! I think part of the problem here stems
from trying to make our internal `Future`-based API cooperate with Java's
`CompleteableFuture` API. I've sketched out something below that doesn't rely
on `CompletableFuture` and (I believe) preserves the semantics we want for
asynchronicity:
```java
public class ConnectorOffsetBackingStore implements OffsetBackingStore {
@Override
public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
Callback<Void> callback) {
final OffsetBackingStore primaryStore;
final OffsetBackingStore secondaryStore;
if (connectorStore.isPresent()) {
primaryStore = connectorStore.get();
secondaryStore = workerStore.orElse(null);
} else if (workerStore.isPresent()) {
primaryStore = workerStore.get();
secondaryStore = null;
} else {
// Should never happen since we check for this case in the
constructor, but just in case, this should
// be more informative than the NPE that would otherwise be
thrown
throw new IllegalStateException("At least one non-null offset
store must be provided");
}
Map<ByteBuffer, ByteBuffer> regularOffsets = new HashMap<>();
Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>();
values.forEach((partition, offset) -> {
if (offset == null) {
tombstoneOffsets.put(partition, null);
} else {
regularOffsets.put(partition, offset);
}
});
if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
return secondaryStore.set(tombstoneOffsets,
(tombstoneWriteError, ignored) -> {
if (tombstoneWriteError != null) {
log.trace("Skipping offsets write to primary store
because secondary tombstone write has failed", tombstoneWriteError);
try (LoggingContext context = loggingContext()) {
callback.onCompletion(tombstoneWriteError, ignored);
}
return;
}
setPrimaryThenSecondary(primaryStore, secondaryStore,
values, regularOffsets, callback);
});
} else {
return setPrimaryThenSecondary(primaryStore, secondaryStore,
values, regularOffsets, callback);
}
}
private Future<Void> setPrimaryThenSecondary(
OffsetBackingStore primaryStore,
OffsetBackingStore secondaryStore,
Map<ByteBuffer, ByteBuffer> completeOffsets,
Map<ByteBuffer, ByteBuffer> nonTombstoneOffsets,
Callback<Void> callback
) {
return primaryStore.set(completeOffsets, (primaryWriteError,
ignored) -> {
if (secondaryStore != null) {
if (primaryWriteError != null) {
log.trace("Skipping offsets write to secondary store
because primary write has failed", primaryWriteError);
} else {
try {
// Invoke OffsetBackingStore::set but ignore the
resulting future; we don't block on writes to this
// backing store.
secondaryStore.set(nonTombstoneOffsets,
(secondaryWriteError, ignored2) -> {
try (LoggingContext context = loggingContext()) {
if (secondaryWriteError != null) {
log.warn("Failed to write offsets to
secondary backing store", secondaryWriteError);
} else {
log.debug("Successfully flushed offsets
to secondary backing store");
}
}
});
} catch (Exception e) {
log.warn("Failed to write offsets to secondary
backing store", e);
}
}
}
try (LoggingContext context = loggingContext()) {
callback.onCompletion(primaryWriteError, ignored);
}
});
}
}
```
It also obviates the need for the `exactlyOnce` and `offsetFlushTimeoutMs`
fields.
If this looks acceptable, I think the only question left is whether
out-of-order writes are possible because of how things are chained. I believe
this is only a problem for non-exactly-once source tasks (since we only have at
most one in-flight offset commit at a time when exactly-once support is
enabled), and should be handled gracefully by
`OffsetStorageWriter::cancelFlush`, but it'd be nice to have a second pair of
eyes to make sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]