rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, the latest-eligible offsets for each source 
partition can be retrieved, where every
+ * record up to and including the record for each returned offset has been 
either
+ * {@link SubmittedRecord#ack() acknowledged} or {@link 
#remove(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link 
SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #remove(SubmittedRecord) removed} if the 
record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but 
may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link 
SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link 
SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #remove removed} if synchronously 
rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), 
(Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> 
offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when 
tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * @param record the {@link #submit previously-submitted} record to stop 
tracking; may not be null
+     */
+    public void remove(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record for partition {}, but no 
records with that partition are present", record.partition());

Review comment:
       IIUC this is really an unexpected condition, since it's single-threaded 
and this method is called only in the catch block after sending a record to the 
producer. But without that context, anyone reading this message in the log file 
might be confused or even concerned about what "remove record for partition..." 
means. WDYT about mentioning more of that context, something like:
   ```suggestion
               log.warn("Attempted to remove record from submitted queue for 
partition {}, but no records with that partition appear to have been 
submitted", record.partition());
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly 
the current state. This
-            // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
-            // buffer; and the current set of user-specified offsets, stored 
in the
-            // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there 
aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish 
sending
-            log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be 
sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, 
null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing 
when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't 
safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets 
will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();

Review comment:
       In any of the cases below where we call `offsetWriter.cancelFlush()`, it 
appears that we're relying upon the offset writer to keep the offsets that it 
was unable to flush -- we're always computing new offsets at this point. 
   
   WDYT about adding a comment above line 474, something along the lines of:
   ```
    // Update the offset writer with any new offsets for records that have been 
acked.
    // The offset writer will continue to track all offsets until they are able 
to be successfully flushed.
    // IOW, if the offset writer fails to flush, it keeps those offset for the 
next attempt,
    // though we may update them here with newer offsets for acked records.
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -570,22 +512,21 @@ public boolean commitOffsets() {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", 
this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
             log.error("{} Timed out waiting to flush offsets to storage", 
this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();

Review comment:
       So
   > Timed out waiting to flush offsets to storage; will try again on next 
flush interval with latest offsets
   
   Sounds good. 
   
   BTW, the fact that the offset writer continues to track the offsets after 
failed flush attempts is a subtle thing, and it's worth calling out above (see 
my comment on lines 476-477) to help explain why we can always replace the 
`offsetsToCommit` map even after the offset writer failed to flush.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = 
Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = 
Collections.singletonMap("subreddit", "adifferentvalue");
+    private static final Map<String, Object> PARTITION3 = 
Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+        assertEmptyRecords();
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, 
PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, 
offset);
+        // Record has been submitted but not yet acked; cannot commit offsets 
for it yet
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        submittedRecord.ack();
+        // Record has been acked; can commit offsets for it
+        assertEquals(Collections.singletonMap(PARTITION1, offset), 
submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been 
cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = 
submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = 
submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = 
submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = 
submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record2.ack();
+        // One record has been ack'd, but a record that comes before it and 
corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition2Record1.ack();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new 
HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been 
cleaned up to avoid memory leaks
+        assertEmptyRecords();
+
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveLastSubmittedRecord() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, 
newOffset());
+        submittedRecords.remove(submittedRecord);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd 
anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        submittedRecord.ack();
+        // Even though the record has somehow been acknowledged, it should not 
be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+    }
+
+    @Test
+    public void testRemoveNotLastSubmittedRecord() {
+        Map<String, Object> partition1Offset = newOffset();
+        Map<String, Object> partition2Offset = newOffset();
+
+        SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, 
partition1Offset);
+        SubmittedRecord lastSubmittedRecord = 
submittedRecords.submit(PARTITION2, partition2Offset);
+
+        assertNoEmptyDeques();
+
+        submittedRecords.remove(recordToRemove);
+
+        assertNoEmptyDeques();
+        // The only record for this partition has been removed; we shouldn't 
be tracking a deque for it anymore
+        assertRemovedDeques(PARTITION1);
+
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd 
anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        recordToRemove.ack();
+        // Even though the record has somehow been acknowledged, it should not 
be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), 
submittedRecords.committableOffsets());
+
+        assertNoEmptyDeques();
+
+        lastSubmittedRecord.ack();
+        // Now that the last-submitted record has been ack'd, we should be 
able to commit its offset
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), 
submittedRecords.committableOffsets());
+
+        // Everything has been ack'd and consumed; make sure that it's been 
cleaned up to avoid memory leaks
+        assertEmptyRecords();

Review comment:
       Also?
   ```suggestion
           assertEmptyRecords();
           assertNoEmptyDeques();
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -246,6 +240,7 @@ public void execute() {
                 }
 
                 maybeThrowProducerSendException();
+                updateCommittableOffsets();

Review comment:
       Actually, I now have a question: why did you choose to add it _before_ 
the `poll()` (a few lines down) rather than after, perhaps after the `if 
(!sendRecords()l) {...}` block below?
   
   The reason I ask is that if one loop of the while polls for records and 
sends them (where they are sent to the producer and asynchronously acked), but 
then the connector is paused about the same time, then the offsets for those 
records will not be committed until after the connector is resumed above. Is 
that intentional?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets.
+ * Note that this class is not thread-safe.
+ */
+class SubmittedRecords {
+    private final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * @param record the record about to be dispatched; may not be null but 
may have a null
+     * {@link SourceRecord#sourcePartition()} and/or {@link 
SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link 
SubmittedRecord#ack() acknowledged} once ack'd by
+     * the producer, or {@link #remove removed} if synchronously rejected by 
the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), 
(Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> 
offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when 
tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.

Review comment:
       > * By clarifying in the Javadoc that, if multiple instances of the same 
SubmittedRecord have been submitted already, only the first one found 
(traversing from the end of the deque backward) will be removed
   
   I think this would be great.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -495,56 +471,25 @@ public boolean commitOffsets() {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly 
the current state. This
-            // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
-            // buffer; and the current set of user-specified offsets, stored 
in the
-            // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there 
aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish 
sending
-            log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be 
sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, 
null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing 
when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't 
safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets 
will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = new HashMap<>();
+        }
 
-            if (!flushStarted) {
-                // There was nothing in the offsets to process, but we still 
waited for the data in the
-                // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.
-                // flush time, which can be used for monitoring even if the 
connector doesn't record any
-                // offsets.
-                finishSuccessfulFlush();
-                long durationMillis = time.milliseconds() - started;
-                recordCommitSuccess(durationMillis);
-                log.debug("{} Finished offset commitOffsets successfully in {} 
ms",
-                        this, durationMillis);
-
-                commitSourceTask();
-                return true;
-            }
+        offsetsToCommit.forEach(offsetWriter::offset);
+        if (!offsetWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still 
waited for the data in the
+            // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.

Review comment:
       What do you mean by "we still waited for the data in the buffer to 
flush"? The `beginFlush()` method doesn't actually do any flushing; it merely 
performs the snapshot of the offset writer's data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to