C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r732232701



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##########
@@ -163,13 +181,24 @@ public synchronized boolean beginFlush() {
             }
 
             // And submit the data
-            log.debug("Submitting {} entries to backing store. The offsets 
are: {}", offsetsSerialized.size(), toFlush);
+            log.debug("Submitting {} entries to backing store. The offsets 
are: {}", offsetsSerialized.size(), flushed);
         }
 
-        return backingStore.set(offsetsSerialized, (error, result) -> {
-            boolean isCurrent = handleFinishWrite(flushId, error, result);
-            if (isCurrent && callback != null) {
-                callback.onCompletion(error, result);
+        return primaryBackingStore.set(offsetsSerialized, (primaryError, 
primaryResult) -> {
+            boolean isCurrent = handleFinishWrite(flushId, primaryError, 
primaryResult);
+            if (isCurrent) {
+                if (callback != null) {
+                    callback.onCompletion(primaryError, primaryResult);
+                }
+                if (secondaryBackingStore != null && primaryError == null) {
+                    secondaryBackingStore.set(offsetsSerialized, 
(secondaryError, secondaryResult) -> {
+                        if (secondaryError != null) {
+                            log.warn("Failed to write offsets ({}) to 
secondary backing store", flushed, secondaryError);
+                        } else {
+                            log.debug("Successfully flushed offsets ({}) to 
secondary backing store", flushed);

Review comment:
       Good catch; I believe there's also a case in the existing case where 
offset commit messages that are logged in a producer callback are also missing 
the MDC context. I've addressed both cases.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>

Review comment:
       Ack, removed.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>

Review comment:
       Ack, removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to