*Kafka Version: 0.10.2.1*
Hi,
I am running a custom connector (in distributed mode) and noticed one of
the partition has its lag increasing consistently although it's assigned to
a connect worker. Log messages in the connect log follow:
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onPartitionsAssigned
- foo.sink-16 assigned topic partition bar-1 with offset 1871179
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask commitOffsets -
WorkerSinkTask{id=foo.sink-16} Skipping offset commit, no change since last
commit
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted -
Finished WorkerSinkTask{id=foo.sink-16} offset commit successfully in 0 ms
[DEBUG] 2017-09-07 14:32:54,572 runtime.WorkerSinkTask onCommitCompleted -
Got callback for timed out commit WorkerSinkTask{id=foo.sink-16}: 2, but
most recent commit is 4
[WARN] 2017-09-07 14:32:54,573 runtime.WorkerSinkTask commitOffsets -
Ignoring invalid task provided offset
bar-1/OffsetAndMetadata{offset=1871179, metadata=''} -- partition not
assigned
Looking at code it seems a callback from previous async offset commit could
reset the lastCommittedOffsets to an incorrect state causing all subsequent
commits to be ignored (see logs above). Relevant code snippets are:
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets,
boolean closing, final int seqno) {
log.info("{} Committing offsets", this);
if (closing) {
doCommitSync(offsets, seqno);
} else {
OffsetCommitCallback cb = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception error) {
if (error == null) {
* lastCommittedOffsets = offsets;*
}
onCommitCompleted(error, seqno);
}
};
consumer.commitAsync(offsets, cb);
}
}
*---*
private void commitOffsets(long now, boolean closing) {
<snip>
.....
final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new
HashMap<>(lastCommittedOffsets);
* if (commitableOffsets.containsKey(partition)) {*
if (taskProvidedOffset.offset() <=
currentOffsets.get(partition).offset()) {
commitableOffsets.put(partition, taskProvidedOffset);
} else {
log.warn("Ignoring invalid task provided offset {}/{}
-- not yet consumed", partition, taskProvidedOffset);
}
} else {
*log.warn("Ignoring invalid task provided offset {}/{} --
partition not assigned", partition, taskProvidedOffset);*
}
<snip>
...
}
*-----*
Is my understanding correct that this looks like a bug?
--
Shrijeet