Per Steffenson, getting sequence numbers correct is definitely difficult,
but this is not Connect's fault. I'd like to see Connect implement
exactly-once from end-to-end, but that requires coordination between
sources and sinks along the lines that you allude to, using sequence
numbers and transactions and whatnot.

The problem with commit() is knowing when it's okay to delete the files in
your example. I don't believe that issue has anything to do with avoiding
dupes or assigning unique sequence numbers. I believe it is safe to delete
a file if you know it has been delivered successfully, which the present
API exposes.

That said, I'm not opposed to your proposed callbacks, and I agree that
commit() and commitRecord() are poorly named. I just don't believe the
present API is incorrect.

Ryanne



On Thu, Oct 18, 2018 at 7:04 AM Per Steffensen <perst...@gmail.com> wrote:

> On 17/10/2018 18.17, Ryanne Dolan wrote:
>
> > this does not guarantee that the
> > offsets of R have been written/flushed at the next commit() call
>
> True, but does it matter? So long as you can guarantee the records are
> delivered to the downstream Kafka cluster, it shouldn't matter if they have
> been committed or not.
>
> The worst that can happen is that the worker gets bounced and asks for the
> same records a second time. Even if those records have since been dropped
> from the upstream data source, it doesn't matter cuz you know they were
> previously delivered successfully.
>
> You are kinda arguing that offsets are not usable at all. I think they
> are. Below I will explain a fairly simple source-connector, and how it
> would be mislead by the way source-connector-framework currently works, and
> how my fix would help it not be. The source-connector is picked out of blue
> air, but not too far from what I have had to deal with in real life
>
> Lets assume I write a fairly simple source-connector, that picks up data
> from files in a given folder. For simplicity lets just assume that each
> file fits in a Kafka-message. My source-connector just sorts the files by
> timestamp and sends out the data in the files, oldest file first. It is
> possible that the receiving side of the data my source-connector sends out,
> will get the same data twice, for one of the following reasons
> * There were actually two input-files that contained exactly the same data
> (in that case the receiving side should handle it twice)
> * The data from that same file may be sent twice in two Kafka-messages,
> due to global atomicy being impossible (in that case the receiving side
> should only handle the data once)
> I order to allow the receiving side to know, when two consecutive messages
> are essentially the same, so that it will know only to handle one of them,
> I introduce a simple sequence-numbering system in my source-connector. I
> simply write a sequence-number in the Kafka-messages, and I use
> Kafka-connect offsets to keep track of the next sequence-number to be used,
> so that I can pick up with the correct sequence-number in case of a
> crash/restart. If there is no offsets when the source-connector starts
> (first start) it will just start with sequence-number 1.
>
> *Assume the following files are in the input-folder:*
> * 2018-01-01_10_00_00-<GUID1>.data
> * 2018-01-01_10_00_00-<GUID2>.data
> * 2018-01-01_10_00_01-<GUID3>.data
> * 2018-01-01_10_00_02-<GUID4>.data
> …
>
> *Now this sequence of events are possible*
> * mySourceConnector.poll() —> [
>   R1 = record({seq: 1, data=<data from
> 2018-01-01_10_00_00-<GUID1>.data>},{ nextSeq=2 }},
>   R2 = record({seq: 2, data=<data from
> 2018-01-01_10_00_00-<GUID2>.data>},{ nextSeq=3 }}
> ]
> * data of R1 was sent and acknowledged
> * mySourceConnector.commitRecord(R1)
> * data of R2 was sent and acknowledged
> * mySourceConnector.commitRecord(R2)
> * offsets-committer kicks in around here and picks up the offsets from R1
> and R2, resulting in the merged offsets to written and flushed to be {
> nextSeq=3 }
> * mySourceConnector.poll() —> [
>   R3 = record({seq: 3, data=<data from
> 2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
> ]
> * data of R3 was sent and acknowledged
> * mySourceConnector.commitRecord(R3)
> * offsets-committer finishes writing and flushing offsets { nextSeq=3 }
> * mySourceConnector.commit()
>
> In mySourceConnector.commit() implementation I believe that the data and
> offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, and
> therefore I delete the following files
> * 2018-01-01_10_00_00-<GUID1>.data
> * 2018-01-01_10_00_00-<GUID2>.data
> * 2018-01-01_10_00_01-<GUID3>.data
> But the truth is that data for R1, R2 and R3 has been sent with
> sequence-number 1, 2 and 3 respectively, but the flushed offsets says {
> nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
> If the system crashes here, upon restart I will get { nextSeq=3 }, but
> file containing the data supposed to get sequence-number 3 has already been
> deleted. Therefore I will end up with this next poll
> * poll() —> [
>   R4 = record({seq: 3, data=<data from 2018-01-01_10_00_02-<GUID4>.data},{
> nextSeq=4 }}
> ]
> If my system had worked I should have ended up with this next poll
> * poll() —> [
>   R4 = record({seq: 4, data=<data from 2018-01-01_10_00_02-<GUID4>.data},{
> nextSeq=5 }}
> ]
> The receiving side of my data will get two messages containing the same
> sequence-number 3. It will therefore incorrectly ignore the second message.
> Even if it double check by looking at the actual data of the two message,
> and If the content of <data from 2018-01-01_10_00_01-<GUID3>.data and <data
> from 2018-01-01_10_00_02-<GUID4>.data was actually identical, it has no way
> of figuring out to do the right thing (actually handle both messages)
>
> *With my fix to the problem*, the call to commit() would have been
> mySourceConnector.commit([R1, R2])
> I would know only to delete the following files
> * 2018-01-01_10_00_00-<GUID1>.data
> * 2018-01-01_10_00_00-<GUID2>.data
> And after crash/restart I would end up sending the correct next message
> mySourceConnector.poll() —> [
>   R3 = record({seq: 3, data=<data from
> 2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
> ]
>
>

Reply via email to