On 17/10/2018 18.17, Ryanne Dolan wrote:
> this does not guarantee that the
> offsets of R have been written/flushed at the next commit() call

True, but does it matter? So long as you can guarantee the records are delivered to the downstream Kafka cluster, it shouldn't matter if they have been committed or not.

The worst that can happen is that the worker gets bounced and asks for the same records a second time. Even if those records have since been dropped from the upstream data source, it doesn't matter cuz you know they were previously delivered successfully.

You are kinda arguing that offsets are not usable at all. I think they are. Below I will explain a fairly simple source-connector, and how it would be mislead by the way source-connector-framework currently works, and how my fix would help it not be. The source-connector is picked out of blue air, but not too far from what I have had to deal with in real life

Lets assume I write a fairly simple source-connector, that picks up data from files in a given folder. For simplicity lets just assume that each file fits in a Kafka-message. My source-connector just sorts the files by timestamp and sends out the data in the files, oldest file first. It is possible that the receiving side of the data my source-connector sends out, will get the same data twice, for one of the following reasons * There were actually two input-files that contained exactly the same data (in that case the receiving side should handle it twice) * The data from that same file may be sent twice in two Kafka-messages, due to global atomicy being impossible (in that case the receiving side should only handle the data once) I order to allow the receiving side to know, when two consecutive messages are essentially the same, so that it will know only to handle one of them, I introduce a simple sequence-numbering system in my source-connector. I simply write a sequence-number in the Kafka-messages, and I use Kafka-connect offsets to keep track of the next sequence-number to be used, so that I can pick up with the correct sequence-number in case of a crash/restart. If there is no offsets when the source-connector starts (first start) it will just start with sequence-number 1.

*Assume the following files are in the input-folder:*
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
* 2018-01-01_10_00_01-<GUID3>.data
* 2018-01-01_10_00_02-<GUID4>.data
…

*Now this sequence of events are possible*
* mySourceConnector.poll() —> [
  R1 = record({seq: 1, data=<data from 2018-01-01_10_00_00-<GUID1>.data>},{ nextSeq=2 }},   R2 = record({seq: 2, data=<data from 2018-01-01_10_00_00-<GUID2>.data>},{ nextSeq=3 }}
]
* data of R1 was sent and acknowledged
* mySourceConnector.commitRecord(R1)
* data of R2 was sent and acknowledged
* mySourceConnector.commitRecord(R2)
* offsets-committer kicks in around here and picks up the offsets from R1 and R2, resulting in the merged offsets to written and flushed to be { nextSeq=3 }
* mySourceConnector.poll() —> [
  R3 = record({seq: 3, data=<data from 2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
]
* data of R3 was sent and acknowledged
* mySourceConnector.commitRecord(R3)
* offsets-committer finishes writing and flushing offsets { nextSeq=3 }
* mySourceConnector.commit()

In mySourceConnector.commit() implementation I believe that the data and offsets for R1, R2 and R3 has been sent/written/flushed/acknowledged, and therefore I delete the following files
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
* 2018-01-01_10_00_01-<GUID3>.data
But the truth is that data for R1, R2 and R3 has been sent with sequence-number 1, 2 and 3 respectively, but the flushed offsets says { nextSeq=3 }, and not { nextSeq=4 } which I would indirectly expect If the system crashes here, upon restart I will get { nextSeq=3 }, but file containing the data supposed to get sequence-number 3 has already been deleted. Therefore I will end up with this next poll
* poll() —> [
  R4 = record({seq: 3, data=<data from 2018-01-01_10_00_02-<GUID4>.data},{ nextSeq=4 }}
]
If my system had worked I should have ended up with this next poll
* poll() —> [
  R4 = record({seq: 4, data=<data from 2018-01-01_10_00_02-<GUID4>.data},{ nextSeq=5 }}
]
The receiving side of my data will get two messages containing the same sequence-number 3. It will therefore incorrectly ignore the second message. Even if it double check by looking at the actual data of the two message, and If the content of <data from 2018-01-01_10_00_01-<GUID3>.data and <data from 2018-01-01_10_00_02-<GUID4>.data was actually identical, it has no way of figuring out to do the right thing (actually handle both messages)

*With my fix to the problem*, the call to commit() would have been
mySourceConnector.commit([R1, R2])
I would know only to delete the following files
* 2018-01-01_10_00_00-<GUID1>.data
* 2018-01-01_10_00_00-<GUID2>.data
And after crash/restart I would end up sending the correct next message
mySourceConnector.poll() —> [
  R3 = record({seq: 3, data=<data from 2018-01-01_10_00_01-<GUID3>.data>},{ nextSeq=4 }}
]

Reply via email to