rhauch commented on a change in pull request #11323: URL: https://github.com/apache/kafka/pull/11323#discussion_r724430246
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, the latest-eligible offsets for each source partition can be retrieved, where every + * record up to and including the record for each returned offset has been either + * {@link SubmittedRecord#ack() acknowledged} or {@link #remove(SubmittedRecord) removed}. + * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be + * {@link SubmittedRecord#ack() acknowledged} from a different thread. + */ +class SubmittedRecords { + + private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); + + // Visible for testing + final Map<Map<String, Object>, Deque<SubmittedRecord>> records; + + public SubmittedRecords() { + this.records = new HashMap<>(); + } + + /** + * Enqueue a new source record before dispatching it to a producer. + * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the + * producer callback, or {@link #remove(SubmittedRecord) removed} if the record could not be successfully + * sent to the producer. + * + * @param record the record about to be dispatched; may not be null but may have a null + * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()} + * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by + * the producer, or {@link #remove removed} if synchronously rejected by the producer + */ + @SuppressWarnings("unchecked") + public SubmittedRecord submit(SourceRecord record) { + return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset()); + } + + // Convenience method for testing + SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) { + SubmittedRecord result = new SubmittedRecord(partition, offset); + records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) + .add(result); + return result; + } + + /** + * Remove a source record and do not take it into account any longer when tracking offsets. + * Useful if the record has been synchronously rejected by the producer. + * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null + */ + public void remove(SubmittedRecord record) { + Deque<SubmittedRecord> deque = records.get(record.partition()); + if (deque == null) { + log.warn("Attempted to remove record for partition {}, but no records with that partition are present", record.partition()); Review comment: IIUC this is really an unexpected condition, since it's single-threaded and this method is called only in the catch block after sending a record to the producer. But without that context, anyone reading this message in the log file might be confused or even concerned about what "remove record for partition..." means. WDYT about mentioning more of that context, something like: ```suggestion log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition()); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -495,56 +471,25 @@ public boolean commitOffsets() { long started = time.milliseconds(); long timeout = started + commitTimeoutMs; + Map<Map<String, Object>, Map<String, Object>> offsetsToCommit; synchronized (this) { - // First we need to make sure we snapshot everything in exactly the current state. This - // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new - // buffer; and the current set of user-specified offsets, stored in the - // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); - // Still wait for any producer records to flush, even if there aren't any offsets to write - // to persistent storage - - // Next we need to wait for all outstanding messages to finish sending - log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); - while (!outstandingMessages.isEmpty()) { - try { - long timeoutMs = timeout - time.milliseconds(); - // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, - // we can stop flushing immediately - if (isCancelled() || timeoutMs <= 0) { - log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - this.wait(timeoutMs); - } catch (InterruptedException e) { - // We can get interrupted if we take too long committing when the work thread shutdown is requested, - // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need - // to stop immediately - log.error("{} Interrupted while flushing messages, offsets will not be committed", this); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - } + offsetsToCommit = this.committableOffsets; + this.committableOffsets = new HashMap<>(); Review comment: In any of the cases below where we call `offsetWriter.cancelFlush()`, it appears that we're relying upon the offset writer to keep the offsets that it was unable to flush -- we're always computing new offsets at this point. WDYT about adding a comment above line 474, something along the lines of: ``` // Update the offset writer with any new offsets for records that have been acked. // The offset writer will continue to track all offsets until they are able to be successfully flushed. // IOW, if the offset writer fails to flush, it keeps those offset for the next attempt, // though we may update them here with newer offsets for acked records. ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -570,22 +512,21 @@ public boolean commitOffsets() { // could look a little confusing. } catch (InterruptedException e) { log.warn("{} Flush of offsets interrupted, cancelling", this); - finishFailedFlush(); + offsetWriter.cancelFlush(); recordCommitFailure(time.milliseconds() - started, e); return false; } catch (ExecutionException e) { log.error("{} Flush of offsets threw an unexpected exception: ", this, e); - finishFailedFlush(); + offsetWriter.cancelFlush(); recordCommitFailure(time.milliseconds() - started, e); return false; } catch (TimeoutException e) { log.error("{} Timed out waiting to flush offsets to storage", this); - finishFailedFlush(); + offsetWriter.cancelFlush(); Review comment: So > Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets Sounds good. BTW, the fact that the offset writer continues to track the offsets after failed flush attempts is a subtle thing, and it's worth calling out above (see my comment on lines 476-477) to help explain why we can always replace the `offsetsToCommit` map even after the offset writer failed to flush. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class SubmittedRecordsTest { + + private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka"); + private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "adifferentvalue"); + private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus"); + + private AtomicInteger offset; + + SubmittedRecords submittedRecords; + + @Before + public void setup() { + submittedRecords = new SubmittedRecords(); + offset = new AtomicInteger(); + } + + @Test + public void testNoRecords() { + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + assertEmptyRecords(); + } + + @Test + public void testNoCommittedRecords() { + for (int i = 0; i < 3; i++) { + for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) { + submittedRecords.submit(partition, newOffset()); + } + } + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + } + + @Test + public void testSingleAck() { + Map<String, Object> offset = newOffset(); + + SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset); + // Record has been submitted but not yet acked; cannot commit offsets for it yet + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + submittedRecord.ack(); + // Record has been acked; can commit offsets for it + assertEquals(Collections.singletonMap(PARTITION1, offset), submittedRecords.committableOffsets()); + + // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks + assertEmptyRecords(); + + // Old offsets should be wiped + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + } + + @Test + public void testMultipleAcksAcrossMultiplePartitions() { + Map<String, Object> partition1Offset1 = newOffset(); + Map<String, Object> partition1Offset2 = newOffset(); + Map<String, Object> partition2Offset1 = newOffset(); + Map<String, Object> partition2Offset2 = newOffset(); + + SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1); + SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2); + SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1); + SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2); + + // No records ack'd yet; can't commit any offsets + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + partition1Record2.ack(); + // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + partition2Record1.ack(); + // We can commit the first offset for the second partition + assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + // No new offsets to commit + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + partition1Record1.ack(); + partition2Record2.ack(); + // We can commit new offsets for both partitions now + Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>(); + expectedOffsets.put(PARTITION1, partition1Offset2); + expectedOffsets.put(PARTITION2, partition2Offset2); + assertEquals(expectedOffsets, submittedRecords.committableOffsets()); + + // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks + assertEmptyRecords(); + + // No new offsets to commit + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + } + + @Test + public void testRemoveLastSubmittedRecord() { + SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset()); + submittedRecords.remove(submittedRecord); + + // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + submittedRecord.ack(); + // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + } + + @Test + public void testRemoveNotLastSubmittedRecord() { + Map<String, Object> partition1Offset = newOffset(); + Map<String, Object> partition2Offset = newOffset(); + + SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, partition1Offset); + SubmittedRecord lastSubmittedRecord = submittedRecords.submit(PARTITION2, partition2Offset); + + assertNoEmptyDeques(); + + submittedRecords.remove(recordToRemove); + + assertNoEmptyDeques(); + // The only record for this partition has been removed; we shouldn't be tracking a deque for it anymore + assertRemovedDeques(PARTITION1); + + // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + recordToRemove.ack(); + // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets + assertEquals(Collections.emptyMap(), submittedRecords.committableOffsets()); + + assertNoEmptyDeques(); + + lastSubmittedRecord.ack(); + // Now that the last-submitted record has been ack'd, we should be able to commit its offset + assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), submittedRecords.committableOffsets()); + + // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks + assertEmptyRecords(); Review comment: Also? ```suggestion assertEmptyRecords(); assertNoEmptyDeques(); ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -246,6 +240,7 @@ public void execute() { } maybeThrowProducerSendException(); + updateCommittableOffsets(); Review comment: Actually, I now have a question: why did you choose to add it _before_ the `poll()` (a few lines down) rather than after, perhaps after the `if (!sendRecords()l) {...}` block below? The reason I ask is that if one loop of the while polls for records and sends them (where they are sent to the producer and asynchronously acked), but then the connector is paused about the same time, then the offsets for those records will not be committed until after the connector is resumed above. Is that intentional? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; + +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. + * Note that this class is not thread-safe. + */ +class SubmittedRecords { + private final Map<Map<String, Object>, Deque<SubmittedRecord>> records; + + public SubmittedRecords() { + this.records = new HashMap<>(); + } + + /** + * Enqueue a new source record before dispatching it to a producer. + * @param record the record about to be dispatched; may not be null but may have a null + * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()} + * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by + * the producer, or {@link #remove removed} if synchronously rejected by the producer + */ + @SuppressWarnings("unchecked") + public SubmittedRecord submit(SourceRecord record) { + return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset()); + } + + // Convenience method for testing + SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) { + SubmittedRecord result = new SubmittedRecord(partition, offset); + records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) + .add(result); + return result; + } + + /** + * Remove a source record and do not take it into account any longer when tracking offsets. + * Useful if the record has been synchronously rejected by the producer. Review comment: > * By clarifying in the Javadoc that, if multiple instances of the same SubmittedRecord have been submitted already, only the first one found (traversing from the end of the deque backward) will be removed I think this would be great. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -495,56 +471,25 @@ public boolean commitOffsets() { long started = time.milliseconds(); long timeout = started + commitTimeoutMs; + Map<Map<String, Object>, Map<String, Object>> offsetsToCommit; synchronized (this) { - // First we need to make sure we snapshot everything in exactly the current state. This - // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new - // buffer; and the current set of user-specified offsets, stored in the - // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); - // Still wait for any producer records to flush, even if there aren't any offsets to write - // to persistent storage - - // Next we need to wait for all outstanding messages to finish sending - log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); - while (!outstandingMessages.isEmpty()) { - try { - long timeoutMs = timeout - time.milliseconds(); - // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, - // we can stop flushing immediately - if (isCancelled() || timeoutMs <= 0) { - log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - this.wait(timeoutMs); - } catch (InterruptedException e) { - // We can get interrupted if we take too long committing when the work thread shutdown is requested, - // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need - // to stop immediately - log.error("{} Interrupted while flushing messages, offsets will not be committed", this); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - } + offsetsToCommit = this.committableOffsets; + this.committableOffsets = new HashMap<>(); + } - if (!flushStarted) { - // There was nothing in the offsets to process, but we still waited for the data in the - // buffer to flush. This is useful since this can feed into metrics to monitor, e.g. - // flush time, which can be used for monitoring even if the connector doesn't record any - // offsets. - finishSuccessfulFlush(); - long durationMillis = time.milliseconds() - started; - recordCommitSuccess(durationMillis); - log.debug("{} Finished offset commitOffsets successfully in {} ms", - this, durationMillis); - - commitSourceTask(); - return true; - } + offsetsToCommit.forEach(offsetWriter::offset); + if (!offsetWriter.beginFlush()) { + // There was nothing in the offsets to process, but we still waited for the data in the + // buffer to flush. This is useful since this can feed into metrics to monitor, e.g. Review comment: What do you mean by "we still waited for the data in the buffer to flush"? The `beginFlush()` method doesn't actually do any flushing; it merely performs the snapshot of the offset writer's data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org