On January 29, 2016 at 11:59:28 AM, Ewen Cheslack-Postava (e...@confluent.io) wrote:
On Fri, Jan 29, 2016 at 7:06 AM, Randall Hauch <rha...@gmail.com> wrote: Luckily, MySQL includes the DDL statements in the log, so my connector parses these as part of processing and builds up the schema state as it goes along. This works beautifully, with the only issue being how to persist and recover this after restarts. Yeah, this is a common complaint about the MySQL binlog. I know James mentioned this as well. It's a bit crazy that you need a full parser for the DDL to make this work :/ Yes, it’s a bit unfortunately, but entirely understandable. Log readers are usually other MySQL servers that can easily handle the DDL. Since several other DBMSes transaction logs were exposed at least initially to enable replication, I suspect needing to read DDL may be the case for several other DBMSes, too. Having a way to get the worker config would be awesome, and IMO it a nice minimalistic approach. If you think this is a good idea, I can log a JIRA and take it to the dev list. I’m willing to work on it, too. I think this is not going to be commonly used, but I think it'd be fine to expose it. I’ll send a message on the dev list and, if I don’t hear otherwise, create a JIRA. I’m starting to think that storing state on a separate dedicated topic is the best option, at least for me. First, connector tasks can easily record their state by simply adding more SourceRecord instances during polling. Second, that information might be useful for downstream consumers. And third, recording state this way requires no changes to the current Kafka Connect. I’d hate to add a feature to Kafka Connect that is not useful to others. Recovery would require consuming this topic upon restart. If the state were incremental, then a restart might require consuming the entire topic. If the state were snapshots, then simply reading the last entry might be sufficient. There’s also the option of doing primarily incremental with periodic snapshots. For my connector, I can easily store each DDL statement, making it incremental. Consuming the whole topic upon restart shouldn’t be that intensive. And if it does, then that only adds to the restart time a bit — no big deal. This all sounds like it should work fine, just remember that you'll have to store enough info with the DDL statement to indicate when it should be applied. The case you want to be careful of is if your connector processes the change and crashes before offsets are committed. When the process comes back up, you need to make sure that if it recovers and starts at an earlier offset that it will still be able to get the previous schema. That's the drawback of not being able to tie writes of state in with offset commits -- you need to handle potential inconsistencies. Ack. I can easily handle this by storing the source offsets of the DDL statement events. This way, when the task comes back it can recover the schema from whatever source position the task is supposed to start reading. As I mentioned, it’s MySQL at the moment and part of a very (!) new OSS project called Debezium (http://debezium.io), and the MySQL connector is the first of hopefully many connectors for a variety of databases. (Contributors welcome!) Looks cool! I see you have at least skeletons for a couple of different DBs. Will the Kafka Connect adapter work with all of them? That’s the plan, though yes the others are mostly skeletons. MongoDB is next, and I hope to adopt Bottled Water for PostgreSQL (I saw that Martin wanted to use Kafka Connect). The one thing I’m unsure of is the best way (other than strings) to put JSON-like values inside Structs, but that’s another topic. Thanks for the help! Randall