On 17/10/2018 18.17, Ryanne Dolan wrote:
> this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call
True, but does it matter? So long as you can guarantee the records are
delivered to the downstream Kafka cluster, it shouldn't matter if they
have been committed or not.
The worst that can happen is that the worker gets bounced and asks for
the same records a second time. Even if those records have since been
dropped from the upstream data source, it doesn't matter cuz you know
they were previously delivered successfully.
You are kinda arguing that offsets are not usable at all. I think they
are. Below I will explain a fairly simple source-connector, and how it
would be mislead by the way source-connector-framework currently works,
and how my fix would help it not be. The source-connector is picked out
of blue air, but not too far from what I have had to deal with in real life
Lets assume I write a fairly simple source-connector, that picks up data
from files in a given folder. For simplicity lets just assume that each
file fits in a Kafka-message. My source-connector just sorts the files
by timestamp and sends out the data in the files, oldest file first. It
is possible that the receiving side of the data my source-connector
sends out, will get the same data twice, for one of the following reasons
* There were actually two input-files that contained exactly the same
data (in that case the receiving side should handle it twice)
* The data from that same file may be sent twice in two Kafka-messages,
due to global atomicy being impossible (in that case the receiving side
should only handle the data once)
I order to allow the receiving side to know, when two consecutive
messages are essentially the same, so that it will know only to handle
one of them, I introduce a simple sequence-numbering system in my
source-connector. I simply write a sequence-number in the
Kafka-messages, and I use Kafka-connect offsets to keep track of the
next sequence-number to be used, so that I can pick up with the correct
sequence-number in case of a crash/restart. If there is no offsets when
the source-connector starts (first start) it will just start with
sequence-number 1.
*Assume the following files are in the input-folder:*
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
* 2018-01-01_10_00_01-<GUID3>.data
* 2018-01-01_10_00_02-<GUID4>.data
…
*Now this sequence of events are possible*
* mySourceConnector.poll() —> [
R1 = record({seq: 1, data=<data from
2018-01-01_10_00_00-<GUID1>.data>},{ nextSeq=2 }},
R2 = record({seq: 2, data=<data from
2018-01-01_10_00_00-<GUID2>.data>},{ nextSeq=3 }}
]
* data of R1 was sent and acknowledged
* mySourceConnector.commitRecord(R1)
* data of R2 was sent and acknowledged
* mySourceConnector.commitRecord(R2)
* offsets-committer kicks in around here and picks up the offsets from
R1 and R2, resulting in the merged offsets to written and flushed to be
{ nextSeq=3 }
* mySourceConnector.poll() —> [
R3 = record({seq: 3, data=<data from
2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
]
* data of R3 was sent and acknowledged
* mySourceConnector.commitRecord(R3)
* offsets-committer finishes writing and flushing offsets { nextSeq=3 }
* mySourceConnector.commit()
In mySourceConnector.commit() implementation I believe that the data and
offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged,
and therefore I delete the following files
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
* 2018-01-01_10_00_01-<GUID3>.data
But the truth is that data for R1, R2 and R3 has been sent with
sequence-number 1, 2 and 3 respectively, but the flushed offsets says {
nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect
If the system crashes here, upon restart I will get { nextSeq=3 }, but
file containing the data supposed to get sequence-number 3 has already
been deleted. Therefore I will end up with this next poll
* poll() —> [
R4 = record({seq: 3, data=<data from
2018-01-01_10_00_02-<GUID4>.data},{ nextSeq=4 }}
]
If my system had worked I should have ended up with this next poll
* poll() —> [
R4 = record({seq: 4, data=<data from
2018-01-01_10_00_02-<GUID4>.data},{ nextSeq=5 }}
]
The receiving side of my data will get two messages containing the same
sequence-number 3. It will therefore incorrectly ignore the second
message. Even if it double check by looking at the actual data of the
two message, and If the content of <data from
2018-01-01_10_00_01-<GUID3>.data and <data from
2018-01-01_10_00_02-<GUID4>.data was actually identical, it has no way
of figuring out to do the right thing (actually handle both messages)
*With my fix to the problem*, the call to commit() would have been
mySourceConnector.commit([R1, R2])
I would know only to delete the following files
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
And after crash/restart I would end up sending the correct next message
mySourceConnector.poll() —> [
R3 = record({seq: 3, data=<data from
2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
]