First of all you can use confluents schema registry as you which - it's not in the paid bundle as long as you are not hosting kafka as a service (ie amazon et al). And I would recommend you to. It's good and trivial to operate.
Second, take a look at the serializer in my pet project at: https://github.com/bitbouncer/kspp/blob/master/include/kspp/avro/avro_serdes.h :96 Note that this encoder/decoder does not support schema evolution but it discovers the actual written schema and gets a "avro::ValidSchema" from the schema registry on read. And this is what you need. This is of course c++ but you can probably figure out what you need to do. In the end you will need a rest/grpc service somewhere that your serializer can use to get an in that you can refer to across your infrastructure. I did write one some years ago but reverted to confluents since most people use that. /svante Den tors 1 aug. 2019 kl 18:05 skrev Martin Mucha <alfon...@gmail.com>: > Thanks for answer! > > Ad: "which byte[] are we talking about?" — actually I don't know. Please > lets break it down together. > > I'm pretty sure, that we're not using confluent platform(iiuc the paid > bundle, right?). I shared some serializer before [1], so you're saying, > that this wont include neither schema ID, nor schema OK? Ok, lets assume > that. Next. We're using SpringKafka project, to get this serialized data > and send them over kafka. So we don't have any schema registry, but in > principle it could be possible to include schema within each message. But I > cannot see how that could be done. SpringKafka requires us to provide > him > org.apache.kafka.clients.producer.ProducerConfig#VALUE_SERIALIZER_CLASS_CONFIG, > which we did, but it's just a class calling serializer [1], and from that > point on I have no idea how it could figure out used schema. The question > here I'm asking is, whether when sending avro bytes (obtained by provided > serializer[1]), they are or can be somehow paired with schema used to > serialize data? Is this what kafka senders do, or can do? Include ID/whole > schema somewhere in headers or ...??? And when I read kafka messages, will > the schema be (or could be) somewhere stored in ConsumerRecord or somewhere > like that? > > sorry for confused questions, but I'm really missing knowledge to even ask > properly. > > thanks, > Martin. > > [1] > public static <T extends SpecificRecordBase> byte[] serialize(T data, > boolean useBinaryDecoder, boolean pretty) { > try { > if (data == null) { > return new byte[0]; > } > > log.debug("data='{}'", data); > Schema schema = data.getSchema(); > ByteArrayOutputStream byteArrayOutputStream = new > ByteArrayOutputStream(); > Encoder binaryEncoder = useBinaryDecoder > ? > EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null) > : EncoderFactory.get().jsonEncoder(schema, > byteArrayOutputStream, pretty); > > DatumWriter<GenericRecord> datumWriter = new > GenericDatumWriter<>(schema); > datumWriter.write(data, binaryEncoder); > > binaryEncoder.flush(); > byteArrayOutputStream.close(); > > byte[] result = byteArrayOutputStream.toByteArray(); > log.debug("serialized data='{}'", > DatatypeConverter.printHexBinary(result)); > return result; > } catch (IOException ex) { > throw new SerializationException( > "Can't serialize data='" + data, ex); > } > } > > čt 1. 8. 2019 v 17:06 odesílatel Svante Karlsson <svante.karls...@csi.se> > napsal: > >> For clarity: What byte[] are we talking about? >> >> You are slightly missing my point if we are speaking about kafka. >> >> Confluent encoding: >> <byte> <int32_t> <avro_binary_payload> >> 0 schema_id avro >> >> avro_binary_payload does not in any case contain the schema or schema id. >> The schema id is a confluent thing. (in an avrofile the schema is prepended >> by value in the file) >> >> While it's trivial to build a schema registry that for example instead >> gives you a md5 hash of the schema you have to use it throughout your >> infrastructure OR use known reader and writer schema (ie hardcoded). >> >> In confluent world the id=N is the N+1'th registered schema in the >> database (a kafka topic) if I remember right. Loose that database and you >> cannot read your kafka topics. >> >> So you have to use some other encoder, homegrown or not that embeds >> either the full schema in every message (expensive) of some id. Does this >> make sense? >> >> /svante >> >> >> >> >> >> >> >> >> >> >> Den tors 1 aug. 2019 kl 16:38 skrev Martin Mucha <alfon...@gmail.com>: >> >>> Thanks for answer. >>> >>> What I knew already is, that in each message there is _somehow_ present >>> either _some_ schema ID or full schema. I saw some byte array manipulations >>> to get _somehow_ defined schema ID from byte[], which worked, but that's >>> definitely not how it should be done. What I'm looking for is some >>> documentation of _how_ to do these things right. I really cannot find a >>> single thing, yet there must be some util functions, or anything. Is there >>> some devel-first-steps page, where can I find answers for: >>> >>> * How to test, whether byte[] contains full schema or just id? >>> * How to control, whether message is serialized with ID or with full >>> schema? >>> * how to get ID from byte[]? >>> * how to get full schema from byte[]? >>> >>> I don't have confluent platform, and cannot have it, but implementing >>> "get schema by ID" should be easy task, provided, that I have that ID. In >>> my scenario I know, that message will be written using one schema, just >>> different versions of it. So I just need to know, which version it is, so >>> that I can configure deserializer to enable schema evolution. >>> >>> thanks in advance, >>> Martin >>> >>> čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <svante.karls...@csi.se> >>> napsal: >>> >>>> In an avrofile the schema is in the beginning but if you refer a single >>>> record serialization like Kafka then you have to add something that you can >>>> use to get hold of the schema. Confluents avroencoder for Kafka uses >>>> confluents schema registry that uses int32 as schema Id. This is prepended >>>> (+a magic byte) to the binary avro. Thus using the schema registry again >>>> you can get the writer schema. >>>> >>>> /Svante >>>> >>>> On Thu, Aug 1, 2019, 15:30 Martin Mucha <alfon...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> just one more question, not strictly related to the subject. >>>>> >>>>> Initially I though I'd be OK with using some initial version of schema >>>>> in place of writer schema. That works, but all columns from schema older >>>>> than this initial one would be just ignored. So I need to know EXACTLY the >>>>> schema, which writer used. I know, that avro messages contains either full >>>>> schema or at least it's ID. Can you point me to the documentation, where >>>>> this is discussed? So in my deserializer I have byte[] as a input, from >>>>> which I need to get the schema information first, in order to be able to >>>>> deserialize the record. I really do not know how to do that, I'm pretty >>>>> sure I never saw this anywhere, and I cannot find it anywhere. But in >>>>> principle it must be possible, since reader need not necessarily have any >>>>> control of which schema writer used. >>>>> >>>>> thanks a lot. >>>>> M. >>>>> >>>>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <alfon...@gmail.com> >>>>> napsal: >>>>> >>>>>> Thank you very much for in depth answer. I understand how it works >>>>>> now better, will test it shortly. >>>>>> Thank you for your time. >>>>>> >>>>>> Martin. >>>>>> >>>>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <r...@skraba.com> >>>>>> napsal: >>>>>> >>>>>>> Hello! It's the same issue in your example code as allegro, even >>>>>>> with >>>>>>> the SpecificDatumReader. >>>>>>> >>>>>>> This line: datumReader = new SpecificDatumReader<>(schema) >>>>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema, >>>>>>> schema) >>>>>>> >>>>>>> In Avro, the original schema is commonly known as the writer schema >>>>>>> (the instance that originally wrote the binary data). Schema >>>>>>> evolution applies when you are using the constructor of the >>>>>>> SpecificDatumReader that takes *both* reader and writer schemas. >>>>>>> >>>>>>> As a concrete example, if your original schema was: >>>>>>> >>>>>>> { >>>>>>> "type": "record", >>>>>>> "name": "Simple", >>>>>>> "fields": [ >>>>>>> {"name": "id", "type": "int"}, >>>>>>> {"name": "name","type": "string"} >>>>>>> ] >>>>>>> } >>>>>>> >>>>>>> And you added a field: >>>>>>> >>>>>>> { >>>>>>> "type": "record", >>>>>>> "name": "SimpleV2", >>>>>>> "fields": [ >>>>>>> {"name": "id", "type": "int"}, >>>>>>> {"name": "name", "type": "string"}, >>>>>>> {"name": "description","type": ["null", "string"]} >>>>>>> ] >>>>>>> } >>>>>>> >>>>>>> You could do the following safely, assuming that Simple and SimpleV2 >>>>>>> classes are generated from the avro-maven-plugin: >>>>>>> >>>>>>> @Test >>>>>>> public void testSerializeDeserializeEvolution() throws IOException { >>>>>>> // Write a Simple v1 to bytes using your exact method. >>>>>>> byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false); >>>>>>> >>>>>>> // Read as Simple v2, same as your method but with the writer and >>>>>>> reader schema. >>>>>>> DatumReader<SimpleV2> datumReader = >>>>>>> new SpecificDatumReader<>(Simple.getClassSchema(), >>>>>>> SimpleV2.getClassSchema()); >>>>>>> Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, >>>>>>> null); >>>>>>> SimpleV2 v2 = datumReader.read(null, decoder); >>>>>>> >>>>>>> assertThat(v2.getId(), is(1)); >>>>>>> assertThat(v2.getName(), is(new Utf8("name1"))); >>>>>>> assertThat(v2.getDescription(), nullValue()); >>>>>>> } >>>>>>> >>>>>>> This demonstrates with two different schemas and SpecificRecords in >>>>>>> the same test, but the same principle applies if it's the same record >>>>>>> that has evolved -- you need to know the original schema that wrote >>>>>>> the data in order to apply the schema that you're now using for >>>>>>> reading. >>>>>>> >>>>>>> I hope this clarifies what you are looking for! >>>>>>> >>>>>>> All my best, Ryan >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <alfon...@gmail.com> >>>>>>> wrote: >>>>>>> > >>>>>>> > Thanks for answer. >>>>>>> > >>>>>>> > Actually I have exactly the same behavior with avro 1.9.0 and >>>>>>> following deserializer in our other app, which uses strictly avro >>>>>>> codebase, >>>>>>> and failing with same exceptions. So lets leave "allegro" library and >>>>>>> lots >>>>>>> of other tools out of it in our discussion. >>>>>>> > I can use whichever aproach. All I need is single way, where I can >>>>>>> deserialize byte[] into class generated by avro-maven-plugin, and which >>>>>>> will respect documentation regarding schema evolution. Currently we're >>>>>>> using following deserializer and serializer, and these does not work >>>>>>> when >>>>>>> it comes to schema evolution. What is the correct way to serialize and >>>>>>> deserializer avro data? >>>>>>> > >>>>>>> > I probably don't understand your mention about GenericRecord or >>>>>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer >>>>>>> below, but then it seems I got back just GenericData$Record instance, >>>>>>> which >>>>>>> I can use then to access array of instances, which is not what I'm >>>>>>> looking >>>>>>> for(IIUC), since in that case I could have just use plain old JSON and >>>>>>> deserialize it using jackson having no schema evolution problems at >>>>>>> all. If >>>>>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow >>>>>>> fix it >>>>>>> if possible. >>>>>>> > >>>>>>> > What can be done? Or how schema evolution is intended to be used? >>>>>>> I found a lots of question searching for this answer. >>>>>>> > >>>>>>> > thanks! >>>>>>> > Martin. >>>>>>> > >>>>>>> > deserializer: >>>>>>> > >>>>>>> > public static <T extends SpecificRecordBase> T >>>>>>> deserialize(Class<T> targetType, >>>>>>> > >>>>>>> byte[] data, >>>>>>> > >>>>>>> boolean useBinaryDecoder) { >>>>>>> > try { >>>>>>> > if (data == null) { >>>>>>> > return null; >>>>>>> > } >>>>>>> > >>>>>>> > log.trace("data='{}'", >>>>>>> DatatypeConverter.printHexBinary(data)); >>>>>>> > >>>>>>> > Schema schema = targetType.newInstance().getSchema(); >>>>>>> > DatumReader<GenericRecord> datumReader = new >>>>>>> SpecificDatumReader<>(schema); >>>>>>> > Decoder decoder = useBinaryDecoder >>>>>>> > ? DecoderFactory.get().binaryDecoder(data, >>>>>>> null) >>>>>>> > : DecoderFactory.get().jsonDecoder(schema, new >>>>>>> String(data)); >>>>>>> > >>>>>>> > T result = targetType.cast(datumReader.read(null, >>>>>>> decoder)); >>>>>>> > log.trace("deserialized data='{}'", result); >>>>>>> > return result; >>>>>>> > } catch (Exception ex) { >>>>>>> > throw new SerializationException("Error deserializing >>>>>>> data", ex); >>>>>>> > } >>>>>>> > } >>>>>>> > >>>>>>> > serializer: >>>>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T >>>>>>> data, boolean useBinaryDecoder, boolean pretty) { >>>>>>> > try { >>>>>>> > if (data == null) { >>>>>>> > return new byte[0]; >>>>>>> > } >>>>>>> > >>>>>>> > log.debug("data='{}'", data); >>>>>>> > Schema schema = data.getSchema(); >>>>>>> > ByteArrayOutputStream byteArrayOutputStream = new >>>>>>> ByteArrayOutputStream(); >>>>>>> > Encoder binaryEncoder = useBinaryDecoder >>>>>>> > ? >>>>>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null) >>>>>>> > : EncoderFactory.get().jsonEncoder(schema, >>>>>>> byteArrayOutputStream, pretty); >>>>>>> > >>>>>>> > DatumWriter<GenericRecord> datumWriter = new >>>>>>> GenericDatumWriter<>(schema); >>>>>>> > datumWriter.write(data, binaryEncoder); >>>>>>> > >>>>>>> > binaryEncoder.flush(); >>>>>>> > byteArrayOutputStream.close(); >>>>>>> > >>>>>>> > byte[] result = byteArrayOutputStream.toByteArray(); >>>>>>> > log.debug("serialized data='{}'", >>>>>>> DatatypeConverter.printHexBinary(result)); >>>>>>> > return result; >>>>>>> > } catch (IOException ex) { >>>>>>> > throw new SerializationException( >>>>>>> > "Can't serialize data='" + data, ex); >>>>>>> > } >>>>>>> > } >>>>>>> > >>>>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <r...@skraba.com> >>>>>>> napsal: >>>>>>> >> >>>>>>> >> Hello! Schema evolution relies on both the writer and reader >>>>>>> schemas >>>>>>> >> being available. >>>>>>> >> >>>>>>> >> It looks like the allegro tool you are using is using the >>>>>>> >> GenericDatumReader that assumes the reader and writer schema are >>>>>>> the >>>>>>> >> same: >>>>>>> >> >>>>>>> >> >>>>>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83 >>>>>>> >> >>>>>>> >> I do not believe that the "default" value is taken into account >>>>>>> for >>>>>>> >> data that is strictly missing from the binary input, just when a >>>>>>> field >>>>>>> >> is known to be in the reader schema but missing from the original >>>>>>> >> writer. >>>>>>> >> >>>>>>> >> You may have more luck reading the GenericRecord with a >>>>>>> >> GenericDatumReader with both schemas, and using the >>>>>>> >> `convertToJson(record)`. >>>>>>> >> >>>>>>> >> I hope this is useful -- Ryan >>>>>>> >> >>>>>>> >> >>>>>>> >> >>>>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <alfon...@gmail.com> >>>>>>> wrote: >>>>>>> >> > >>>>>>> >> > Hi, >>>>>>> >> > >>>>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution. >>>>>>> >> > >>>>>>> >> > When reading through avro documentation, for example [1], I >>>>>>> understood, that schema evolution is supported, and if I added column >>>>>>> with >>>>>>> specified default, it should be backwards compatible (and even forward >>>>>>> when >>>>>>> I remove it again). Sounds great, so I added column defined as: >>>>>>> >> > >>>>>>> >> > { >>>>>>> >> > "name": "newColumn", >>>>>>> >> > "type": ["null","string"], >>>>>>> >> > "default": null, >>>>>>> >> > "doc": "something wrong" >>>>>>> >> > } >>>>>>> >> > >>>>>>> >> > and try to consumer some topic having this schema from >>>>>>> beginning, it fails with message: >>>>>>> >> > >>>>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5 >>>>>>> >> > at >>>>>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) >>>>>>> >> > at org.apache.avro.io >>>>>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290) >>>>>>> >> > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>>>>>> >> > at org.apache.avro.io >>>>>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>>>>> >> > at >>>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83) >>>>>>> >> > to give a little bit more information. Avro schema defines one >>>>>>> top level type, having 2 fields. String describing type of message, and >>>>>>> union of N types. All N-1, non-modified types can be read, but one >>>>>>> updated >>>>>>> with optional, default-having column cannot be read. I'm not sure if >>>>>>> this >>>>>>> design is strictly speaking correct, but that's not the point (feel >>>>>>> free to >>>>>>> criticise and recommend better approach!). I'm after schema evolution, >>>>>>> which seems not to be working. >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > And if we alter type definition to: >>>>>>> >> > >>>>>>> >> > "type": "string", >>>>>>> >> > "default": "" >>>>>>> >> > it still does not work and generated error is: >>>>>>> >> > >>>>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed >>>>>>> data. Length is negative: -1 >>>>>>> >> > at org.apache.avro.io >>>>>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336) >>>>>>> >> > at org.apache.avro.io >>>>>>> .BinaryDecoder.readString(BinaryDecoder.java:263) >>>>>>> >> > at org.apache.avro.io >>>>>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>>>>> >> > at >>>>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>>>>> >> > at >>>>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83) >>>>>>> >> > >>>>>>> >> > Am I doing something wrong? >>>>>>> >> > >>>>>>> >> > thanks, >>>>>>> >> > Martin. >>>>>>> >> > >>>>>>> >> > [1] >>>>>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules >>>>>>> >>>>>>