I had another idea with regard to schema handling, and I'd be interested to 
hear what you think.

Background: At LinkedIn, most of the data that goes through Kafka is encoded as 
Avro. Each message is a binary Avro record, prefixed with 17 bytes: the first 
byte is always zero (a version number, to enable future changes of the encoding 
scheme), and the next 16 bytes are the MD5 hash of the schema with which the 
record was encoded.

In order to decode messages, you need to map those MD5 hashes back to the 
corresponding schema. That is done via a separate service, called a schema 
registry. Every producer that sends messages to Kafka must ensure that the 
schema it's using is known to the registry beforehand. Every consumer that sees 
a MD5 hash that it doesn't know can look up the schema by MD5 hash in the 
registry, and then use that schema to parse the rest of the message.

That all works fine, and as long as you follow the rules for schema evolution, 
it's ok for a Kafka topic to contain a mixture of messages with old and new 
versions of a schema. But the downside is that you need a separate schema 
registry service.

It occurred to me that if you're using Samza, the schema registry is 
superfluous. Instead, we can designate one Kafka topic to be the schema 
registry (call the topic "avro-schemas", for example).

Every time a producer starts up, the first message it emits is to the 
avro-schemas topic, using the MD5 hash of the schema as the key of the message, 
and the Avro schema JSON as the value of the message. All subsequent messages 
it emits to other topics follow the encoding described above.

In most cases, the key and value of the message to the avro-schemas topic will 
be identical to a previously-sent message, because the schema is already known, 
and Kafka's compaction by key will eventually remove the duplicates. But if you 
deploy a version of a producer that uses a new schema, you don't need to do any 
special registration of the schema; everyone consuming the avro-schemas topic 
will find out about the new schema as soon as the producer starts up.

As a consumer, you consume the avro-schemas topic as well as whatever other 
topics you want. Every message on the avro-schemas topic is persisted to a 
local key-value store. Thus every consumer has its own copy of the schema 
registry, and can decode any Avro-encoded message that comes in on the other 
topics.

Some caveats:

- Each Samza task needs to consume all partitions of the avro-schemas topic 
(or, equivalently, the avro-schemas topic must have only a single partition). I 
don't expect that to be a problem, since the expected throughput of this topic 
is quite low (producers don't restart that often), and the total data volume is 
quite small (you might have a few thousand distinct schemas, even in a large 
organization).

- There is a race condition when a new producer first starts up, where 
consumers may see messages encoded with the new schema before they see the new 
schema itself appear on the avro-schemas topic. In that case, the messages 
encoded with the new topic would need to be buffered (or discarded and the 
stream rewound) until the notification of the new schema comes in on the 
avro-schemas topic. That could be done transparently by the framework, so jobs 
wouldn't have to worry about it.


My feeling is that this would be an elegant and powerful way of providing 
richly typed serdes in Samza, without imposing a schema management burden on 
job authors. The fiddly details of encoding schema versions and performing 
schema evolution would be abstracted away by the framework.

What do you think?

Martin


On 13 Feb 2014, at 21:19, Steve Yates <[email protected]> wrote:
> It's not an easy problem to solve however we chose protocol buffers in the 
> past as schema evolution and maintaning compatibility with older messages was 
> supported as long as you appended the additional attributes on the bottom of 
> the schema def.
> 
> -S
> 
> -------- Original message --------
> From: Martin Kleppmann <[email protected]> 
> Date:  
> To: "<[email protected]>" <[email protected]> 
> Subject: Re: Thinking about message formats 
> 
> Hi Garry,
> 
> Yes, that issue often trips up people who are new to Avro. It's also Avro's 
> main difference to Thrift and Protocol Buffers, which use a different 
> approach to schema evolution. This blog post may be helpful for understanding 
> the different approaches:
> http://martin.kleppmann.com/2012/12/05/schema-evolution-in-avro-protocol-buffers-thrift.html
> 
> In your case, if you're not already committed to Avro for other reasons, then 
> Protocol Buffers or Thrift may work slightly better for you. They include an 
> identification tag on each field name, and when a parser encounters a field 
> with a tag it doesn't know, that field is ignored. This approach means the 
> Samza job doesn't have to know about the latest schema, and all messages 
> simply get interpreted as the version of the schema that was compiled into 
> the Samza job.
> 
> Cheers,
> Martin
> 
> On 13 Feb 2014, at 17:31, Jakob Homan <[email protected]> wrote:
>> Yeah, schema evolution for never-ending jobs has been a bit annoying so
>> far.  So long as the Samza job has the latest version of the schema for
>> producing and the messages arrive as generic records with their schema
>> intact, one can transcode the newly arrived messages to the latest schema.
>> This is what we're doing.  It's a manual step and I'd rather if Avro
>> better supported this automatically, but it works for now.  I've been
>> meaning to clean up the code and put it into Samza proper in a util package
>> or something.
>> 
>> This approach does have a couple issues:
>> (1) If there is a new version of the schema, one needs to restart the job
>> with the latest version.  It could be possible to have another stream that
>> published schema changes (getting a bit meta here) and the job could
>> subscribe to that, updating as necessary.  Making sure everything hits in
>> the right order is a bit tricky.  Alternatively, one could look for schema
>> changes and assume any new one is the latest one, reconfiguring as
>> necessary.  So far we've been using the restart-the-job approach.
>> (2) Transcoding effectively bumps each message up to the newest version as
>> they traverse the Samza pipelines.  This can be a good thing or not,
>> depending on what you're looking for.  I don't think there's a one true
>> correct approach to online schema evolution; it'll come down to a policy
>> decision.
>> -jg
>> 
>> 
>> 
>> On Thu, Feb 13, 2014 at 7:12 AM, Garry Turkington <
>> [email protected]> wrote:
>> 
>>> Hi Martin,
>>> 
>>> Thanks for the input, and don't worry, it did make sense. :)
>>> 
>>> Though it did identify a hole in my understanding of Avro; I think I've
>>> been spoiled using container files that included the schema. I thought that
>>> a reader could process an Avro message with a previous version of the
>>> schema, I didn't know it needed to know the exact schema with which the
>>> message was written.
>>> 
>>> I normally have a new column added to my input files, often with a default
>>> value and then afterwards add the specific processing that actually uses
>>> the new data. So in the first case I don't care if the reader doesn't see
>>> the new column, I just don't want it to break when newer version messages
>>> come along. But sounds like it's not quite that straightforward and before
>>> any consumer receives any messages written with a new schema that I'll need
>>> make that new schema available to the consumer in some way.
>>> 
>>> Thanks,
>>> Garry
>>> 
>>> -----Original Message-----
>>> From: Martin Kleppmann [mailto:[email protected]]
>>> Sent: 13 February 2014 15:11
>>> To: <[email protected]>
>>> Subject: Re: Thinking about message formats
>>> 
>>> Hi Garry,
>>> 
>>> We use Avro a lot, and it works well with Samza. Schema evolution is very
>>> good thing to have in your toolbox.
>>> 
>>> One thing to keep in mind with Avro: in order to parse a message, you need
>>> to know the exact schema with which the data was written. You may have
>>> multiple different producers writing messages with different schemas, so
>>> the messages need to be tagged with the schema version they're using. The
>>> consumer (Samza job) can then use schema evolution to map all messages to
>>> the same schema (e.g. the latest version of the schema).
>>> 
>>> There currently isn't a standard way for tagging an individual
>>> Avro-encoded message with the schema that was used to encode it. The
>>> simplest you could do is to give every version of your schema a unique
>>> version number, and to prefix every message in Kafka with the schema
>>> version number. The version number could be a hash of the schema, or a
>>> sequential number that you assign manually.
>>> 
>>> You can then compile the mapping from version numbers to Avro schemas into
>>> your Samza job. For every message that comes in, you look at the version
>>> number prefix, and parse the rest of the message with the appropriate
>>> schema. (However, with this approach you need to deploy the Samza job to be
>>> aware of any new schema version before any producer starts generating
>>> messages in the new schema.) Alternatively, you can keep the mapping from
>>> version numbers to schemas in a separate service, and look up version
>>> numbers on demand. This has been discussed in
>>> https://issues.apache.org/jira/browse/AVRO-1124 but afaik is not yet
>>> publicly available.
>>> 
>>> Is this making any sense? I fear it sounds a bit confusing.
>>> 
>>> Martin
>>> 
>>> On 13 Feb 2014, at 13:27, Garry Turkington <
>>> [email protected]> wrote:
>>>> Hi,
>>>> 
>>>> I was thinking about how best to do testing on Samza jobs. The ability
>>> to replay streams appears to help a lot here as by pushing some data into
>>> the consumed streams then rewinding it is always possible to get the same
>>> data fed through the tasks. So that helps a lot in terms of dealing with
>>> known input data and such.
>>>> 
>>>> But then I started thinking about message format evolution over time
>>> which in honesty wasn't something I had considered before. My primary use
>>> cases for Samza are pulling apart lots of log files as they arrive so the
>>> obvious thing is to push each record/line as a single message. The problem
>>> of course is that as those log formats evolve over  time (almost always by
>>> having new columns added) that I need change both the ingest mechanism and
>>> the Samza tasks; firstly just not to be broken by the new format, secondly
>>> to actually use the additional columns if appropriate.
>>>> 
>>>> At which point Avro seems to have lots of value as a message format,
>>> we're moving to use it elsewhere in the data backend for very similar
>>> reasons of ability to manage schema evolution.
>>>> 
>>>> Anyone went down this path at all? I guess there are two  approaches,
>>> just have Samza treat the Avro message as a string and have each task parse
>>> and extract the fields of interest or to build an Avro serde that delivers
>>> an Avro record object in the envelope.
>>>> 
>>>> Thanks
>>>> Garry
>>>> 
>>>> 
>>> ...........................................................................................................................
>>>> Garry Turkington | CTO | +44-7871315944 | skypeGarryTurkington:
>>>> Improve Digital - Real time advertising technology A company of
>>>> PubliGroupe
>>>> 
>>>> cid:[email protected]
>>>> 
>>> 
>>> 
>>> -----
>>> No virus found in this message.
>>> Checked by AVG - www.avg.com
>>> Version: 2014.0.4259 / Virus Database: 3697/7086 - Release Date: 02/12/14
>>> 
> 

Reply via email to