vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1259538919


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##########
@@ -279,8 +284,51 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> 
values, Callback<Void> callb
             throw new IllegalStateException("At least one non-null offset 
store must be provided");
         }
 
+        boolean containsTombstones = values.containsValue(null);
+
+        // If there are tombstone offsets, then the failure to write to 
secondary store will
+        // not be ignored. Also, for tombstone records, we first write to 
secondary store and
+        // then to primary stores.
+        if (secondaryStore != null && containsTombstones) {
+            AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new 
AtomicReference<>();
+            FutureCallback<Void> secondaryWriteFuture = new FutureCallback<>();
+            secondaryStore.set(values, secondaryWriteFuture);
+            try {
+                // For EOS, there is no timeout for offset commit and it is 
allowed to take as much time as needed for
+                // commits. We still need to wait because we want to fail the 
offset commit for cases when

Review Comment:
   hmm this is is a bit tricky. This is what the `commitTransaction` method 
does => 
   
https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L303-L312
   
   which basically says `Blocks until all outstanding records have been sent 
and ack'd`. This is what the KIP says as well:
   
   ```
   The worker-level 
[offset.flush.timeout.ms](https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms)
 property will be ignored for exactly-once source tasks. They will be allowed 
to take as long as necessary to complete an offset commit, since the cost of 
failure at that point is to fail the source task. Currently, all source task 
offset commits take place on a single shared worker-global thread. In order to 
support source task commits without a timeout, but also prevent  laggy tasks 
from disrupting the availability of other tasks on the cluster, the worker will 
be modified to permit simultaneous source task offset commits.
   
   It may take longer than the transaction timeout for a task to flush all of 
its records to Kafka. In this case, there are some remedial actions that users 
can take to nurse their connector back to health: tune their producer 
configuration for higher throughput, increase the transaction timeout for the 
producers used by the connector, decrease the offset commit interval (if using 
interval-based transaction boundaries), or switch to the poll  value for the 
transaction.boundary  property. We will include include these steps in the 
error message for a task that fails due to producer transaction timeout.
   ```
   
   Keeping these things in mind, I had assumed it's good to wait forever. But I 
also see your point. I don't think the producer used to write to the global 
offsets topic would even be a transactional one and yet we are waiting.
   
   I think @C0urante would be the best person to answer this at this point, but 
my opinion is to block forever in this case. But I am open to suggestions.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to