rhauch commented on a change in pull request #11323:
URL: https://github.com/apache/kafka/pull/11323#discussion_r740478689
##########
File path:
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -474,9 +479,24 @@ public boolean commitOffsets() {
long timeout = started + commitTimeoutMs;
Map<Map<String, Object>, Map<String, Object>> offsetsToCommit;
+ SubmittedRecords.Pending pendingMetadataForCommit;
synchronized (this) {
offsetsToCommit = this.committableOffsets;
this.committableOffsets = new HashMap<>();
+ pendingMetadataForCommit = this.pendingRecordsMetadata;
+ this.pendingRecordsMetadata = null;
+ }
+
+ if (pendingMetadataForCommit != null) {
+ log.info("There are currently {} pending messages spread across {}
source partitions whose offsets will not be committed. "
+ + "The source partition with the most pending
messages is {}, with {} pending messages",
+ pendingMetadataForCommit.totalPendingMessages(),
+ pendingMetadataForCommit.numDeques(),
+ pendingMetadataForCommit.largestDequePartition(),
+ pendingMetadataForCommit.largestDequeSize()
+ );
+ } else {
+ log.info("There are currently no pending messages for this offset
commit; all messages since the last commit have been acknowledged");
Review comment:
As you point out, the old log message was:
```
log.info("{} flushing {} outstanding messages for offset commit", this,
outstandingMessages.size());
```
This log message had two things it'd be nice to keep:
1. `this` as the context; and
2. the number of records whose offsets were being committed (e.g., the
number of acked records).
I think both would be good to include, especially if we're saying the number
of records whose offsets are _not_ being committed (yet).
The `Pending` class seems pretty useful, but computing the number of acked
records is not possible here. WDYT about merging the
`SumittedRecords.committableOffsets()` and `pending()` methods, by having the
former return an object that contains the offset map _and_ the metadata that
can be used for logging? This class would be like `Pending`, though maybe
`CommittableOffsets` is a more apt name. Plus, `WorkerSourceTask` would only
have one volatile field that is updated atomically.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]