Per Steffensen created KAFKA-5716:
-------------------------------------

             Summary: Connect: When SourceTask.commit it is possible not 
everthing from SourceTask.poll has been sent
                 Key: KAFKA-5716
                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
             Project: Kafka
          Issue Type: Bug
            Reporter: Per Steffensen
            Priority: Minor


Not looking at the very latest code, so the "problem" may have been corrected 
recently. If so, I apologize. I found the "problem" by code-inspection alone, 
so I may be wrong. Have not had the time to write tests to confirm.

According to java-doc on SourceTask.commit
{quote}
Commit the offsets, up to the offsets that have been returned by \{@link 
#poll()}. This
method should block until the commit is complete.

SourceTasks are not required to implement this functionality; Kafka Connect 
will record offsets
automatically. This hook is provided for systems that also need to store 
offsets internally
in their own system.
{quote}

As I read this, when commit-method is called, the SourceTask-developer is 
"told" that everything returned from poll up until "now" has been sent/stored - 
both the outgoing messages and the associated connect-offsets. Looking at the 
implementation it also seems that this is what it tries to "guarantee/achieve".

But as I see read the code, it is not necessarily true
The following threads are involved
* Task-thread: WorkerSourceTask has its own thread running 
WorkerSourceTask.execute.
* Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled to 
call WorkerSourceTask.commitOffsets (from a different thread)

The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
commitOffsets respectively, hindering the task-thread to add to 
outstandingMessages and offsetWriter while committer-thread is marking what has 
to be flushed in the offsetWriter and waiting for outstandingMessages to be 
empty. This means that the offsets committed will be consistent with what has 
been sent out, but not necessarily what has been polled. At least I do not see 
why the following is not possible:
* Task-thread polls something from the task.poll
* Before task-thread gets to add (all) the polled records to 
outstandingMessages and offsetWriter in sendRecords, committer-thread kicks in 
and does its commiting, while hindering the task-thread adding the polled 
records to outstandingMessages and offsetWriter
* Consistency will not have been compromised, but committer-thread will end up 
calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
records just polled from task.poll has been sent or corresponding 
connector-offsets flushed.

If I am right, I guess there are two way to fix it
* Either change the java-doc of SourceTask.commit, to something a-la (which I 
do believe is true)
{quote}
Commit the offsets, up to the offsets that have been returned by \{@link 
#poll()}
*and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
This method should block until the commit is complete.

SourceTasks are not required to implement this functionality; Kafka Connect 
will record offsets
automatically. This hook is provided for systems that also need to store 
offsets internally
in their own system.
{quote}
* or, fix the "problem" so that it actually does what the java-doc says :-)

If I am not right, of course I apologize for the inconvenience. I would 
appreciate an explanation where my code-inspection is not correct, and why it 
works even though I cannot see it. I will not expect such an explanation, 
though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to