rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r727228287



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source 
partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been 
either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link 
#removeLastOccurrence(SubmittedRecord) removed}.

Review comment:
       Since I'm asking about another change, I think there is a copy-paste 
grammatical error in this sentence we could fix.
   ```suggestion
    * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
    * {@link #committableOffsets()}, where every record up to and including the 
record for each returned offset has been
    * either {@link SubmittedRecord#ack() acknowledged} or {@link 
#removeLastOccurrence(SubmittedRecord) removed}.
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -378,7 +370,7 @@ private boolean sendRecords() {
                             log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);

Review comment:
       We're not calling 
`submittedRecords.removeLastOccurrence(submittedRecord)` here. Were you 
thinking that we're setting the `producerSendException`, which will cause the 
`execute()` method to throw this same exception on the next pass and 
consequently fail the task? 
   
   I think that's the right choice and no changes are required, but I do need 
to work through it. So pardon my thought process here.
   
   The question is: what happens to records (and `SubmittedRecord` objects and 
their offsets) that appear after the record that resulted in the asynchronous 
exception?
   
   What happens depends on what the producer behavior is, or might be in the 
future. IIRC the exceptions will often be unrecoverable, but it is possible 
that records could be sent successfully even if they were submitted to the 
producer _after_ the record that failed, especially when those records were 
sent to a different topic partition and were actually sent by the producer 
_before_ the record that failed. After all, from the [producer.send() 
JavaDoc](https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L839):
   > Callbacks for records being sent to the same partition are guaranteed to 
execute in order.
   
   Unfortunately, we cannot infer a relationship between the topic partition 
for a record and its source partition. So any subsequent records that were sent 
to a different topic partition could still have the same source partition, and 
thus they should be enqueued into the same deque. Those offsets would not be 
committed, since their `SubmittedRecord` instances are after the 
`SubmittedRecord` for the record that failed to send, and the latter would 
never be acked (as its send failed).
   
   But if any subsequent records were sent to a different topic partition but 
had a _different_ source partition, their `SubmittedRecord` instances would be 
in a different deque than the `SubmittedRecord` for the record that failed to 
send, and their offsets _could_ potentially be committed.
   
   If the committed offsets were moved as suggested in a separate thread above, 
we'd actually get a chance to commit offsets for acked source records before 
failing the task. It's not super essential, but it'd be good to commit the 
offsets for as many of those submitted-and-acked records as possible.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -250,6 +248,9 @@ public void execute() {
                         recordPollReturned(toSend.size(), time.milliseconds() 
- start);
                     }
                 }
+
+                updateCommittableOffsets();
+

Review comment:
       Sorry, maybe I wasn't clear in my [previous comment about this 
call[(https://github.com/apache/kafka/pull/11323#discussion_r724437761). I 
think there is an edge case here that we could deal with a bit better. Consider 
the following scenario as we walk through the loop in `execute()`. Th 
eWorkerSourceTask is not paused, and has been sending and committing offsets 
for records.
   
   On some pass through the `execute()` while loop:
   1. `shouldPause()` returns false
   2. `maybeThrowProducerSendException()` does nothing since no exception was 
set from the producer
   3. `poll()` is called to get new records from the source task;
   4. `updateCommittableOffsets()` is called to update the `committableOffsets` 
map for any records sent in previous loops that have been acked
   5. `sendRecords()` is called with the records retrieved in step 3 earlier in 
this same pass, which for each of these new records enqueues a 
`SubmittedRecord` and calls `producer.send(...)` on each record with a callback 
that acks the submitted record.
   
   But just after step 1 in the aforementioned pass, the connector and its 
tasks are paused. This means that the next pass through the 
`WorkerSourceTask.execute()` while loop:
   1. `shouldPause()` returns true, so
   2. `onPause()` is called and `awaitUnpause()` is called.
   
   At that point, the thread blocks. But the records that were send to the 
producer in step 5 of the previous pass may have already been acked, meaning we 
_could_ have update the offsets just before we paused. That might not have been 
enough time for all of the records submitted in that step to be acked, but if 
we were to move the `updateCommittableOffsets()` to just before the `if 
(shouldPause())` check then we will get the offsets for as many acked records 
as possible just before the thread will pause.
   
   In all other non-paused scenarios, I'm not sure it matters where in this 
loop we call `updateCommittableOffsets()`. But for the just-been-paused 
scenario, moving it to the first (or last) operation in the loop gives us a bit 
more of a chance to commit the offsets for as many acked records as possible.
   
   WDYT?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       See [this comment in a subsequent 
review](https://github.com/apache/kafka/pull/11323#discussion_r727223026) for a 
followup.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       See [this comment in a subsequent 
review](https://github.com/apache/kafka/pull/11323#discussion_r727223026) for a 
followup. Unresolving for better visibility.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to