[ 
https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710216#comment-14710216
 ] 

Ewen Cheslack-Postava commented on KAFKA-2367:
----------------------------------------------

I posted an initial patch. It covers roughly the functionality of Avro's APIs. 
The actual data API code (not tests) clocks in at < 600 lines, substantially 
smaller than the code from Avro (which didn't even have GenericRecord support 
included...) Overall the code (including tests) shrunk by 1200 lines and now 
has more functionality. The API broader than Avro in some ways (more primitive 
types to cover types available in other serialization formats, supports typed 
keys in maps, etc), in other ways it's omitted some things currently (enums, 
unions). 

Some notes and thoughts:

* I have not defined any logical types yet, but they can be implemented for any 
type and identified by the name + version fields. I think most of the effort 
here is in the specification. The implementation is just providing the schema 
definitions, and occasionally including conversion utilities for built-in java 
types that correspond to the type and are likely to be used commonly.

* The version field is a bit weird - it is a byte[]. This lets you, e.g., use a 
network byte order integer directly if you want to. I considered just requiring 
a string instead. The overhead of requiring encoding to a string seems pretty 
minimal for versioning schemes I can think of (incrementing integers, hashes), 
but the byte[] seemed like the right way to handle it. One thing the 
documentation for version numbers doesn't currently define is how ordering 
works. I'm not sure versions are going to be very useful without this (besides 
uniquely identifying the schema, it would be nice to know whether a schema is 
newer or older than another one), but I'm not sure everyone will have a 
versioning scheme that allows for comparing versions without contacting an 
external service.

* I provided support for byte[] and ByteBuffer. byte[] is really annoying since 
equals/hashCode don't work for it; the situation is made worse because it can 
be nested, arbitrarily deeply, in lists and maps. I haven't tried to address 
the equals/hashCode problems in those cases. I think its better to recommend 
using ByteBuffer when you need to be able to test for equality/compute 
hashcodes. However, this also needs to be respected by deserializers/converters.

* Made schemas explicit in the Copycat APIs. Previously schemas were inferred 
for primitive types. I think making this explicit might be a bit more effort 
for connector developers, but it's ultimately the right thing to do. There are 
too many weird edge cases you can get into without explicitly specifying the 
schema, especially with list/map types (if it's empty, what are the key/value 
schemas?) and optional values (if you encounter a null, what's the schema?).

* Caching of conversion of schemas. In the JSON implementation we're including, 
we're probably being pretty wasteful right now since every record has to 
translate both the schema and data to JSON. We should definitely be doing some 
caching here. I think an LRU using an IdentityHashMap should be fine. However, 
this does assume that connectors are good about reusing schemas (defining them 
up front, or if they are dynamic they should have their own cache of schemas 
and be able to detect when they can be reused).

* Many serialization formats don't support unions natively. They require a 
separate struct that contains optional fields for all the types (and might 
provide syntactic sugar, but it has no impact on the actual encoding). When you 
make nullability part of every type, I think the need for unions pretty much 
disappears -- creating the appropriate struct is trivial.

* I didn't include an IndexedRecord-like interface. I did allow setting a field 
by passing in the Field object instead of the field name, which avoids having 
to do a hash lookup of the fieldname, which has basically the same result as 
setting via the index. I didn't want to expose something like put(int 
fieldNumber, Object value) because using an array internally in Struct is 
really an implementation detail to avoid object allocations.

* The fluent API for SchemaBuilder is different from how avro works. It prefers 
a single class for building schemas, doesn't directly handle nesting (you can 
just run another schemabuilder inline in the calls to the parent). This 
actually is a substantial difference -- in Copycat the expectation is that most 
schemas are created dynamically (most connectors do *not* have fixed schemas to 
work with), whereas Avro's seems more targeted towards more static use cases 
(not surprisingly). Avro's more complex implementation can catch certain types 
of errors at compile time (there are lots of different *Builder classes with 
various abilities/restrictions), but for Copycat this doesn't help much and 
just makes the code more verbose. See, e.g., 
https://github.com/confluentinc/copycat/blob/master/avro/src/main/java/io/confluent/copycat/avro/AvroData.java
 where a lot of the code paths are basically the same and the code could be a 
lot more concise except when I tried to do so, I ended up unable to get the 
different types from Avro's SchemaBuilder classes to work out properly. Since a 
lot of schema building in copycat is performed incrementally and can't *always* 
be chained nicely without a lot of code duplication, it's nice to have just a 
single type and gives you more flexibility in how you structure your code. The 
cost is that there are some runtime exceptions you might need to be careful to 
catch.

* There is only a single API for Struct and struct building, i.e. Structs are 
mutable and there is no separate StructBuilder. This keeps the API simpler, 
more concise, and there are clear points in the Copycat runtime where we can 
call validate() on the data if we want to. This also means that Structs are 
"fluent", i.e. return this consistently for chaining. This may seem an odd 
choice given I did create separate Schema and SchemaBuilder classes, but I 
think that makes sense since you want schemas to be immutable once you've 
created them (which we don't quite enforce fully due to returning a mutable 
list of fields, but it's pretty close).

* The combination of optional/nullable and default is a bit tricky and I think 
the semantics vary across the different serializers. What do you do when you 
have an optional field and a default? If you use null (or not serialized) to 
indicate the value hasn't been set, then you can't *explicitly* set the field 
to null. If you do something more complicated, you have to track more 
state/create more objects when setting field values (something like 
Option<Object> rather than just Object). The result of this is that if you 
actually want to be able to encode nulls, you cannot provide a non-null default 
value. In other words, the way to think of the default value is as a *getter* 
default value, not a *putter* default value: you only use the default value if 
a get() call would return null. I think I've gotten this right, but it's 
something to watch out for when reviewing.

> Add Copycat runtime data API
> ----------------------------
>
>                 Key: KAFKA-2367
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2367
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: copycat
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Ewen Cheslack-Postava
>             Fix For: 0.8.3
>
>
> Design the API used for runtime data in Copycat. This API is used to 
> construct schemas and records that Copycat processes. This needs to be a 
> fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to 
> support complex, varied data types that may be input from/output to many data 
> systems.
> This should issue should also address the serialization interfaces used 
> within Copycat, which translate the runtime data into serialized byte[] form. 
> It is important that these be considered together because the data format can 
> be used in multiple ways (records, partition IDs, partition offsets), so it 
> and the corresponding serializers must be sufficient for all these use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to