Hi Martin, I'll admit that I was somewhat interested in the notion of a topic for Avro schemas when Jakob mentioned it in passing, I like where you've taken it. Couple of questions:
When you say richly-typed serdes do you mean that some sort of Avro serde would actually deliver the message contents within the envelope as a GenericRecord or some other Avro-specific pre-constructed object instead of a bag of bytes? When you say the framework would handle things such as buffering of messages until new schemas became available etc do you see that as being encapsulated within the serde or something more generic than that? This does seem to be putting significantly more functionality (including state management) into the handling serde than we see in the existing ones. Not necessarily an intrinsically bad thing of course. After some investigation following from the previous messages in this topic I've decided that Avro messages do make sense for me as my main downstream system is already using Avro and the main upstream one is moving there. So definitely interested in this discussion and even though for my small number of schemas a registry would be pretty trivial it would also be nice to have the evolution given some machinery within Samza itself. Regards Garry -----Original Message----- From: Martin Kleppmann [mailto:[email protected]] Sent: 22 February 2014 18:20 To: [email protected] Subject: Re: Thinking about message formats I had another idea with regard to schema handling, and I'd be interested to hear what you think. Background: At LinkedIn, most of the data that goes through Kafka is encoded as Avro. Each message is a binary Avro record, prefixed with 17 bytes: the first byte is always zero (a version number, to enable future changes of the encoding scheme), and the next 16 bytes are the MD5 hash of the schema with which the record was encoded. In order to decode messages, you need to map those MD5 hashes back to the corresponding schema. That is done via a separate service, called a schema registry. Every producer that sends messages to Kafka must ensure that the schema it's using is known to the registry beforehand. Every consumer that sees a MD5 hash that it doesn't know can look up the schema by MD5 hash in the registry, and then use that schema to parse the rest of the message. That all works fine, and as long as you follow the rules for schema evolution, it's ok for a Kafka topic to contain a mixture of messages with old and new versions of a schema. But the downside is that you need a separate schema registry service. It occurred to me that if you're using Samza, the schema registry is superfluous. Instead, we can designate one Kafka topic to be the schema registry (call the topic "avro-schemas", for example). Every time a producer starts up, the first message it emits is to the avro-schemas topic, using the MD5 hash of the schema as the key of the message, and the Avro schema JSON as the value of the message. All subsequent messages it emits to other topics follow the encoding described above. In most cases, the key and value of the message to the avro-schemas topic will be identical to a previously-sent message, because the schema is already known, and Kafka's compaction by key will eventually remove the duplicates. But if you deploy a version of a producer that uses a new schema, you don't need to do any special registration of the schema; everyone consuming the avro-schemas topic will find out about the new schema as soon as the producer starts up. As a consumer, you consume the avro-schemas topic as well as whatever other topics you want. Every message on the avro-schemas topic is persisted to a local key-value store. Thus every consumer has its own copy of the schema registry, and can decode any Avro-encoded message that comes in on the other topics. Some caveats: - Each Samza task needs to consume all partitions of the avro-schemas topic (or, equivalently, the avro-schemas topic must have only a single partition). I don't expect that to be a problem, since the expected throughput of this topic is quite low (producers don't restart that often), and the total data volume is quite small (you might have a few thousand distinct schemas, even in a large organization). - There is a race condition when a new producer first starts up, where consumers may see messages encoded with the new schema before they see the new schema itself appear on the avro-schemas topic. In that case, the messages encoded with the new topic would need to be buffered (or discarded and the stream rewound) until the notification of the new schema comes in on the avro-schemas topic. That could be done transparently by the framework, so jobs wouldn't have to worry about it. My feeling is that this would be an elegant and powerful way of providing richly typed serdes in Samza, without imposing a schema management burden on job authors. The fiddly details of encoding schema versions and performing schema evolution would be abstracted away by the framework. What do you think? Martin On 13 Feb 2014, at 21:19, Steve Yates <[email protected]> wrote: > It's not an easy problem to solve however we chose protocol buffers in the > past as schema evolution and maintaning compatibility with older messages was > supported as long as you appended the additional attributes on the bottom of > the schema def. > > -S > > -------- Original message -------- > From: Martin Kleppmann <[email protected]> > Date: > To: "<[email protected]>" > <[email protected]> > Subject: Re: Thinking about message formats > > Hi Garry, > > Yes, that issue often trips up people who are new to Avro. It's also Avro's > main difference to Thrift and Protocol Buffers, which use a different > approach to schema evolution. This blog post may be helpful for understanding > the different approaches: > http://martin.kleppmann.com/2012/12/05/schema-evolution-in-avro-protoc > ol-buffers-thrift.html > > In your case, if you're not already committed to Avro for other reasons, then > Protocol Buffers or Thrift may work slightly better for you. They include an > identification tag on each field name, and when a parser encounters a field > with a tag it doesn't know, that field is ignored. This approach means the > Samza job doesn't have to know about the latest schema, and all messages > simply get interpreted as the version of the schema that was compiled into > the Samza job. > > Cheers, > Martin > > On 13 Feb 2014, at 17:31, Jakob Homan <[email protected]> wrote: >> Yeah, schema evolution for never-ending jobs has been a bit annoying >> so far. So long as the Samza job has the latest version of the >> schema for producing and the messages arrive as generic records with >> their schema intact, one can transcode the newly arrived messages to the >> latest schema. >> This is what we're doing. It's a manual step and I'd rather if Avro >> better supported this automatically, but it works for now. I've been >> meaning to clean up the code and put it into Samza proper in a util >> package or something. >> >> This approach does have a couple issues: >> (1) If there is a new version of the schema, one needs to restart the >> job with the latest version. It could be possible to have another >> stream that published schema changes (getting a bit meta here) and >> the job could subscribe to that, updating as necessary. Making sure >> everything hits in the right order is a bit tricky. Alternatively, >> one could look for schema changes and assume any new one is the >> latest one, reconfiguring as necessary. So far we've been using the >> restart-the-job approach. >> (2) Transcoding effectively bumps each message up to the newest >> version as they traverse the Samza pipelines. This can be a good >> thing or not, depending on what you're looking for. I don't think >> there's a one true correct approach to online schema evolution; it'll >> come down to a policy decision. >> -jg >> >> >> >> On Thu, Feb 13, 2014 at 7:12 AM, Garry Turkington < >> [email protected]> wrote: >> >>> Hi Martin, >>> >>> Thanks for the input, and don't worry, it did make sense. :) >>> >>> Though it did identify a hole in my understanding of Avro; I think >>> I've been spoiled using container files that included the schema. I >>> thought that a reader could process an Avro message with a previous >>> version of the schema, I didn't know it needed to know the exact >>> schema with which the message was written. >>> >>> I normally have a new column added to my input files, often with a >>> default value and then afterwards add the specific processing that >>> actually uses the new data. So in the first case I don't care if the >>> reader doesn't see the new column, I just don't want it to break >>> when newer version messages come along. But sounds like it's not >>> quite that straightforward and before any consumer receives any >>> messages written with a new schema that I'll need make that new schema >>> available to the consumer in some way. >>> >>> Thanks, >>> Garry >>> >>> -----Original Message----- >>> From: Martin Kleppmann [mailto:[email protected]] >>> Sent: 13 February 2014 15:11 >>> To: <[email protected]> >>> Subject: Re: Thinking about message formats >>> >>> Hi Garry, >>> >>> We use Avro a lot, and it works well with Samza. Schema evolution is >>> very good thing to have in your toolbox. >>> >>> One thing to keep in mind with Avro: in order to parse a message, >>> you need to know the exact schema with which the data was written. >>> You may have multiple different producers writing messages with >>> different schemas, so the messages need to be tagged with the schema >>> version they're using. The consumer (Samza job) can then use schema >>> evolution to map all messages to the same schema (e.g. the latest version >>> of the schema). >>> >>> There currently isn't a standard way for tagging an individual >>> Avro-encoded message with the schema that was used to encode it. The >>> simplest you could do is to give every version of your schema a >>> unique version number, and to prefix every message in Kafka with the >>> schema version number. The version number could be a hash of the >>> schema, or a sequential number that you assign manually. >>> >>> You can then compile the mapping from version numbers to Avro >>> schemas into your Samza job. For every message that comes in, you >>> look at the version number prefix, and parse the rest of the message >>> with the appropriate schema. (However, with this approach you need >>> to deploy the Samza job to be aware of any new schema version before >>> any producer starts generating messages in the new schema.) >>> Alternatively, you can keep the mapping from version numbers to >>> schemas in a separate service, and look up version numbers on >>> demand. This has been discussed in >>> https://issues.apache.org/jira/browse/AVRO-1124 but afaik is not yet >>> publicly available. >>> >>> Is this making any sense? I fear it sounds a bit confusing. >>> >>> Martin >>> >>> On 13 Feb 2014, at 13:27, Garry Turkington < >>> [email protected]> wrote: >>>> Hi, >>>> >>>> I was thinking about how best to do testing on Samza jobs. The >>>> ability >>> to replay streams appears to help a lot here as by pushing some data >>> into the consumed streams then rewinding it is always possible to >>> get the same data fed through the tasks. So that helps a lot in >>> terms of dealing with known input data and such. >>>> >>>> But then I started thinking about message format evolution over >>>> time >>> which in honesty wasn't something I had considered before. My >>> primary use cases for Samza are pulling apart lots of log files as >>> they arrive so the obvious thing is to push each record/line as a >>> single message. The problem of course is that as those log formats >>> evolve over time (almost always by having new columns added) that I >>> need change both the ingest mechanism and the Samza tasks; firstly >>> just not to be broken by the new format, secondly to actually use the >>> additional columns if appropriate. >>>> >>>> At which point Avro seems to have lots of value as a message >>>> format, >>> we're moving to use it elsewhere in the data backend for very >>> similar reasons of ability to manage schema evolution. >>>> >>>> Anyone went down this path at all? I guess there are two >>>> approaches, >>> just have Samza treat the Avro message as a string and have each >>> task parse and extract the fields of interest or to build an Avro >>> serde that delivers an Avro record object in the envelope. >>>> >>>> Thanks >>>> Garry >>>> >>>> >>> ........................................................................................................................... >>>> Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington: >>>> Improve Digital - Real time advertising technology A company of >>>> PubliGroupe >>>> >>>> cid:[email protected] >>>> >>> >>> >>> ----- >>> No virus found in this message. >>> Checked by AVG - www.avg.com >>> Version: 2014.0.4259 / Virus Database: 3697/7086 - Release Date: >>> 02/12/14 >>> >
