vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1259538919
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,8 +284,51 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + FutureCallback<Void> secondaryWriteFuture = new FutureCallback<>(); + secondaryStore.set(values, secondaryWriteFuture); + try { + // For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for + // commits. We still need to wait because we want to fail the offset commit for cases when Review Comment: hmm this is is a bit tricky. This is what the `commitTransaction` method does => https://github.com/apache/kafka/blob/6368d14a1d8c37305290b8b89fb5990ad07aa4db/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L303-L312 which basically says `Blocks until all outstanding records have been sent and ack'd`. This is what the KIP says as well: ``` The worker-level [offset.flush.timeout.ms](https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms) property will be ignored for exactly-once source tasks. They will be allowed to take as long as necessary to complete an offset commit, since the cost of failure at that point is to fail the source task. Currently, all source task offset commits take place on a single shared worker-global thread. In order to support source task commits without a timeout, but also prevent laggy tasks from disrupting the availability of other tasks on the cluster, the worker will be modified to permit simultaneous source task offset commits. It may take longer than the transaction timeout for a task to flush all of its records to Kafka. In this case, there are some remedial actions that users can take to nurse their connector back to health: tune their producer configuration for higher throughput, increase the transaction timeout for the producers used by the connector, decrease the offset commit interval (if using interval-based transaction boundaries), or switch to the poll value for the transaction.boundary property. We will include include these steps in the error message for a task that fails due to producer transaction timeout. ``` Keeping these things in mind, I had assumed it's good to wait forever. But I also see your point. I don't think the producer used to write to the global offsets topic would even be a transactional one and yet we are waiting. I think @C0urante would be the best person to answer this at this point, but my opinion is to block forever in this case. But I am open to suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org