On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (e...@confluent.io) 
wrote:
Randall, 

Great question. Ideally you wouldn't need this type of state since it 
should really be available in the source system. In your case, it might 
actually make sense to be able to grab that information from the DB itself, 
although that will also have issues if, for example, there have been 
multiple schema changes and you can no longer get a previous schema from 
the current state of the tables. 
I agree that ideally connectors would be stateless, or at least have no need 
for maintaining state across restarts. Unfortunately, that’s not always 
possible.

Reading the log but using the current schema does pose a problem if/when the 
schema has evolved since the point in the log that we’re currently reading. 
This is far more of an issue if you’re playing catch up and there’s been 
non-compatible schema changes.

Case in point: when MySQL inserts/updates/removes a row from a table, it writes 
an event in the log that includes (a) a table identifier and (b) the row values 
in column-order. There is no other information. Column renames might be okay, 
but adding or removing columns will likely result in mismatching the row values 
to the appropriate columns.

Luckily, MySQL includes the DDL statements in the log, so my connector parses 
these as part of processing and builds up the schema state as it goes along. 
This works beautifully, with the only issue being how to persist and recover 
this after restarts.


The offset storage is probably pretty close to what you're looking for, 
although we obviously structure that very narrowly. Adding in some sort of 
other state store is an interesting idea, though I'd be curious how many 
other systems encounter similar challenges. I think one way to do this 
without huge changes and in a way that properly handles offset commits 
would be to expose a small API for setting local state and have Connect 
store that state right in the same topic (and message) as offsets. To 
handle offset commits and reviving tasks that hit a fault, we would just 
grab the current state as part of the process of committing offsets. Then 
offsets would just be a special case of that more general state. 

However, I'm also wary of doing something like this. Right now every worker 
has to consume the entire offsets topic. This works fine because offsets, 
while capable of being pretty complex, are generally pretty small such that 
there's no concern having to tail it on all workers (and no concern for the 
load on brokers leading those partitions). Once you provide a generic state 
storage mechanism without clear constraints on how it should be used, 
someone will come along and abuse it. Also, with offsets it is very clear 
(to the connector developer) which task should write to which keys (where 
the key here is the partition of the source partitioned stream). With 
support for arbitrary state, ownership of different subsets of the key 
space is very unclear. I think you may not have that particular problem 
because you probably only have 1 partition anyway since you are reading a 
transaction log. 

In any case, you're right that this doesn't exist today. There is one very 
hacky way to get around this, which is to store that schema information in 
your "offsets". This may not be *too* bad -- it'll increase the size of 
offset data, but probably doesn't affect much else. The data size may not 
be that bad as long as offsets aren't committed too frequently. In terms of 
performance, I'm assuming these schema changes are relatively rare, and you 
can just include the same schema object in every offset you create during 
the periods between schema changes so you (and the GC) are probably only 
doing a negligible amount of extra work. 

Hmm, it sound like hammering accumulated state into the offsets could be pretty 
problematic and potentially risky, especially if the state has very different 
size and frequency characteristics than the offsets.

Re: creating a consumer, Connect doesn't provide any utilities to do that 
since the goal is to handle everything Kafka-related for the connector 
developer so they can just focus on getting the data from the other system! 
We could consider exposing some of the worker config though, which I 
imagine is all you really need -- it'd just be convenient to have the 
connection info for the Kafka brokers. 
Having a way to get the worker config would be awesome, and IMO it a nice 
minimalistic approach. If you think this is a good idea, I can log a JIRA and 
take it to the dev list. I’m willing to work on it, too. 

I’m starting to think that storing state on a separate dedicated topic is the 
best option, at least for me. First, connector tasks can easily record their 
state by simply adding more SourceRecord instances during polling. Second, that 
information might be useful for downstream consumers. And third, recording 
state this way requires no changes to the current Kafka Connect. I’d hate to 
add a feature to Kafka Connect that is not useful to others.

Recovery would require consuming this topic upon restart. If the state were 
incremental, then a restart might require consuming the entire topic. If the 
state were snapshots, then simply reading the last entry might be sufficient. 
There’s also the option of doing primarily incremental with periodic snapshots.

For my connector, I can easily store each DDL statement, making it incremental. 
Consuming the whole topic upon restart shouldn’t be that intensive. And if it 
does, then that only adds to the restart time a bit — no big deal.

Finally, I'd love to know which DB you're reading the transaction log from 
and if you're planning on open sourcing the connector:) 

As I mentioned, it’s MySQL at the moment and part of a very (!) new OSS project 
called Debezium (http://debezium.io), and the MySQL connector is the first of 
hopefully many connectors for a variety of databases. (Contributors welcome!)

Best regards,

Randall

Reply via email to