rhauch commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r727228287
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every + * record up to and including the record for each returned offset has been either + * {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}. Review comment: Since I'm asking about another change, I think there is a copy-paste grammatical error in this sentence we could fix. ```suggestion * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}. ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -378,7 +370,7 @@ private boolean sendRecords() { log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); Review comment: We're not calling `submittedRecords.removeLastOccurrence(submittedRecord)` here. Were you thinking that we're setting the `producerSendException`, which will cause the `execute()` method to throw this same exception on the next pass and consequently fail the task? I think that's the right choice and no changes are required, but I do need to work through it. So pardon my thought process here. The question is: what happens to records (and `SubmittedRecord` objects and their offsets) that appear after the record that resulted in the asynchronous exception? What happens depends on what the producer behavior is, or might be in the future. IIRC the exceptions will often be unrecoverable, but it is possible that records could be sent successfully even if they were submitted to the producer _after_ the record that failed, especially when those records were sent to a different topic partition and were actually sent by the producer _before_ the record that failed. After all, from the [producer.send() JavaDoc](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L839): > Callbacks for records being sent to the same partition are guaranteed to execute in order. Unfortunately, we cannot infer a relationship between the topic partition for a record and its source partition. So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their `SubmittedRecord` instances are after the `SubmittedRecord` for the record that failed to send, and the latter would never be acked (as its send failed). But if any subsequent records were sent to a different topic partition but had a _different_ source partition, their `SubmittedRecord` instances would be in a different deque than the `SubmittedRecord` for the record that failed to send, and their offsets _could_ potentially be committed. If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -250,6 +248,9 @@ public void execute() { recordPollReturned(toSend.size(), time.milliseconds() - start); } } + + updateCommittableOffsets(); + Review comment: Sorry, maybe I wasn't clear in my [previous comment about this call[(https://github.com/apache/kafka/pull/11323#discussion_r724437761). I think there is an edge case here that we could deal with a bit better. Consider the following scenario as we walk through the loop in `execute()`. Th eWorkerSourceTask is not paused, and has been sending and committing offsets for records. On some pass through the `execute()` while loop: 1. `shouldPause()` returns false 2. `maybeThrowProducerSendException()` does nothing since no exception was set from the producer 3. `poll()` is called to get new records from the source task; 4. `updateCommittableOffsets()` is called to update the `committableOffsets` map for any records sent in previous loops that have been acked 5. `sendRecords()` is called with the records retrieved in step 3 earlier in this same pass, which for each of these new records enqueues a `SubmittedRecord` and calls `producer.send(...)` on each record with a callback that acks the submitted record. But just after step 1 in the aforementioned pass, the connector and its tasks are paused. This means that the next pass through the `WorkerSourceTask.execute()` while loop: 1. `shouldPause()` returns true, so 2. `onPause()` is called and `awaitUnpause()` is called. At that point, the thread blocks. But the records that were send to the producer in step 5 of the previous pass may have already been acked, meaning we _could_ have update the offsets just before we paused. That might not have been enough time for all of the records submitted in that step to be acked, but if we were to move the `updateCommittableOffsets()` to just before the `if (shouldPause())` check then we will get the offsets for as many acked records as possible just before the thread will pause. In all other non-paused scenarios, I'm not sure it matters where in this loop we call `updateCommittableOffsets()`. But for the just-been-paused scenario, moving it to the first (or last) operation in the loop gives us a bit more of a chance to commit the offsets for as many acked records as possible. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org