C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r732232701
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ########## @@ -163,13 +181,24 @@ public synchronized boolean beginFlush() { } // And submit the data - log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), toFlush); + log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), flushed); } - return backingStore.set(offsetsSerialized, (error, result) -> { - boolean isCurrent = handleFinishWrite(flushId, error, result); - if (isCurrent && callback != null) { - callback.onCompletion(error, result); + return primaryBackingStore.set(offsetsSerialized, (primaryError, primaryResult) -> { + boolean isCurrent = handleFinishWrite(flushId, primaryError, primaryResult); + if (isCurrent) { + if (callback != null) { + callback.onCompletion(primaryError, primaryResult); + } + if (secondaryBackingStore != null && primaryError == null) { + secondaryBackingStore.set(offsetsSerialized, (secondaryError, secondaryResult) -> { + if (secondaryError != null) { + log.warn("Failed to write offsets ({}) to secondary backing store", flushed, secondaryError); + } else { + log.debug("Successfully flushed offsets ({}) to secondary backing store", flushed); Review comment: Good catch; I believe there's also a case in the existing case where offset commit messages that are logged in a producer callback are also missing the MDC context. I've addressed both cases. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> Review comment: Ack, removed. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> Review comment: Ack, removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org