Thanks for answer. What I knew already is, that in each message there is _somehow_ present either _some_ schema ID or full schema. I saw some byte array manipulations to get _somehow_ defined schema ID from byte[], which worked, but that's definitely not how it should be done. What I'm looking for is some documentation of _how_ to do these things right. I really cannot find a single thing, yet there must be some util functions, or anything. Is there some devel-first-steps page, where can I find answers for:
* How to test, whether byte[] contains full schema or just id? * How to control, whether message is serialized with ID or with full schema? * how to get ID from byte[]? * how to get full schema from byte[]? I don't have confluent platform, and cannot have it, but implementing "get schema by ID" should be easy task, provided, that I have that ID. In my scenario I know, that message will be written using one schema, just different versions of it. So I just need to know, which version it is, so that I can configure deserializer to enable schema evolution. thanks in advance, Martin čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <svante.karls...@csi.se> napsal: > In an avrofile the schema is in the beginning but if you refer a single > record serialization like Kafka then you have to add something that you can > use to get hold of the schema. Confluents avroencoder for Kafka uses > confluents schema registry that uses int32 as schema Id. This is prepended > (+a magic byte) to the binary avro. Thus using the schema registry again > you can get the writer schema. > > /Svante > > On Thu, Aug 1, 2019, 15:30 Martin Mucha <alfon...@gmail.com> wrote: > >> Hi, >> >> just one more question, not strictly related to the subject. >> >> Initially I though I'd be OK with using some initial version of schema in >> place of writer schema. That works, but all columns from schema older than >> this initial one would be just ignored. So I need to know EXACTLY the >> schema, which writer used. I know, that avro messages contains either full >> schema or at least it's ID. Can you point me to the documentation, where >> this is discussed? So in my deserializer I have byte[] as a input, from >> which I need to get the schema information first, in order to be able to >> deserialize the record. I really do not know how to do that, I'm pretty >> sure I never saw this anywhere, and I cannot find it anywhere. But in >> principle it must be possible, since reader need not necessarily have any >> control of which schema writer used. >> >> thanks a lot. >> M. >> >> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <alfon...@gmail.com> >> napsal: >> >>> Thank you very much for in depth answer. I understand how it works now >>> better, will test it shortly. >>> Thank you for your time. >>> >>> Martin. >>> >>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <r...@skraba.com> napsal: >>> >>>> Hello! It's the same issue in your example code as allegro, even with >>>> the SpecificDatumReader. >>>> >>>> This line: datumReader = new SpecificDatumReader<>(schema) >>>> should be: datumReader = new SpecificDatumReader<>(originalSchema, >>>> schema) >>>> >>>> In Avro, the original schema is commonly known as the writer schema >>>> (the instance that originally wrote the binary data). Schema >>>> evolution applies when you are using the constructor of the >>>> SpecificDatumReader that takes *both* reader and writer schemas. >>>> >>>> As a concrete example, if your original schema was: >>>> >>>> { >>>> "type": "record", >>>> "name": "Simple", >>>> "fields": [ >>>> {"name": "id", "type": "int"}, >>>> {"name": "name","type": "string"} >>>> ] >>>> } >>>> >>>> And you added a field: >>>> >>>> { >>>> "type": "record", >>>> "name": "SimpleV2", >>>> "fields": [ >>>> {"name": "id", "type": "int"}, >>>> {"name": "name", "type": "string"}, >>>> {"name": "description","type": ["null", "string"]} >>>> ] >>>> } >>>> >>>> You could do the following safely, assuming that Simple and SimpleV2 >>>> classes are generated from the avro-maven-plugin: >>>> >>>> @Test >>>> public void testSerializeDeserializeEvolution() throws IOException { >>>> // Write a Simple v1 to bytes using your exact method. >>>> byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false); >>>> >>>> // Read as Simple v2, same as your method but with the writer and >>>> reader schema. >>>> DatumReader<SimpleV2> datumReader = >>>> new SpecificDatumReader<>(Simple.getClassSchema(), >>>> SimpleV2.getClassSchema()); >>>> Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, null); >>>> SimpleV2 v2 = datumReader.read(null, decoder); >>>> >>>> assertThat(v2.getId(), is(1)); >>>> assertThat(v2.getName(), is(new Utf8("name1"))); >>>> assertThat(v2.getDescription(), nullValue()); >>>> } >>>> >>>> This demonstrates with two different schemas and SpecificRecords in >>>> the same test, but the same principle applies if it's the same record >>>> that has evolved -- you need to know the original schema that wrote >>>> the data in order to apply the schema that you're now using for >>>> reading. >>>> >>>> I hope this clarifies what you are looking for! >>>> >>>> All my best, Ryan >>>> >>>> >>>> >>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <alfon...@gmail.com> >>>> wrote: >>>> > >>>> > Thanks for answer. >>>> > >>>> > Actually I have exactly the same behavior with avro 1.9.0 and >>>> following deserializer in our other app, which uses strictly avro codebase, >>>> and failing with same exceptions. So lets leave "allegro" library and lots >>>> of other tools out of it in our discussion. >>>> > I can use whichever aproach. All I need is single way, where I can >>>> deserialize byte[] into class generated by avro-maven-plugin, and which >>>> will respect documentation regarding schema evolution. Currently we're >>>> using following deserializer and serializer, and these does not work when >>>> it comes to schema evolution. What is the correct way to serialize and >>>> deserializer avro data? >>>> > >>>> > I probably don't understand your mention about GenericRecord or >>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer >>>> below, but then it seems I got back just GenericData$Record instance, which >>>> I can use then to access array of instances, which is not what I'm looking >>>> for(IIUC), since in that case I could have just use plain old JSON and >>>> deserialize it using jackson having no schema evolution problems at all. If >>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it >>>> if possible. >>>> > >>>> > What can be done? Or how schema evolution is intended to be used? I >>>> found a lots of question searching for this answer. >>>> > >>>> > thanks! >>>> > Martin. >>>> > >>>> > deserializer: >>>> > >>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T> >>>> targetType, >>>> > byte[] >>>> data, >>>> > >>>> boolean useBinaryDecoder) { >>>> > try { >>>> > if (data == null) { >>>> > return null; >>>> > } >>>> > >>>> > log.trace("data='{}'", >>>> DatatypeConverter.printHexBinary(data)); >>>> > >>>> > Schema schema = targetType.newInstance().getSchema(); >>>> > DatumReader<GenericRecord> datumReader = new >>>> SpecificDatumReader<>(schema); >>>> > Decoder decoder = useBinaryDecoder >>>> > ? DecoderFactory.get().binaryDecoder(data, null) >>>> > : DecoderFactory.get().jsonDecoder(schema, new >>>> String(data)); >>>> > >>>> > T result = targetType.cast(datumReader.read(null, >>>> decoder)); >>>> > log.trace("deserialized data='{}'", result); >>>> > return result; >>>> > } catch (Exception ex) { >>>> > throw new SerializationException("Error deserializing >>>> data", ex); >>>> > } >>>> > } >>>> > >>>> > serializer: >>>> > public static <T extends SpecificRecordBase> byte[] serialize(T data, >>>> boolean useBinaryDecoder, boolean pretty) { >>>> > try { >>>> > if (data == null) { >>>> > return new byte[0]; >>>> > } >>>> > >>>> > log.debug("data='{}'", data); >>>> > Schema schema = data.getSchema(); >>>> > ByteArrayOutputStream byteArrayOutputStream = new >>>> ByteArrayOutputStream(); >>>> > Encoder binaryEncoder = useBinaryDecoder >>>> > ? >>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null) >>>> > : EncoderFactory.get().jsonEncoder(schema, >>>> byteArrayOutputStream, pretty); >>>> > >>>> > DatumWriter<GenericRecord> datumWriter = new >>>> GenericDatumWriter<>(schema); >>>> > datumWriter.write(data, binaryEncoder); >>>> > >>>> > binaryEncoder.flush(); >>>> > byteArrayOutputStream.close(); >>>> > >>>> > byte[] result = byteArrayOutputStream.toByteArray(); >>>> > log.debug("serialized data='{}'", >>>> DatatypeConverter.printHexBinary(result)); >>>> > return result; >>>> > } catch (IOException ex) { >>>> > throw new SerializationException( >>>> > "Can't serialize data='" + data, ex); >>>> > } >>>> > } >>>> > >>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <r...@skraba.com> >>>> napsal: >>>> >> >>>> >> Hello! Schema evolution relies on both the writer and reader schemas >>>> >> being available. >>>> >> >>>> >> It looks like the allegro tool you are using is using the >>>> >> GenericDatumReader that assumes the reader and writer schema are the >>>> >> same: >>>> >> >>>> >> >>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83 >>>> >> >>>> >> I do not believe that the "default" value is taken into account for >>>> >> data that is strictly missing from the binary input, just when a >>>> field >>>> >> is known to be in the reader schema but missing from the original >>>> >> writer. >>>> >> >>>> >> You may have more luck reading the GenericRecord with a >>>> >> GenericDatumReader with both schemas, and using the >>>> >> `convertToJson(record)`. >>>> >> >>>> >> I hope this is useful -- Ryan >>>> >> >>>> >> >>>> >> >>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <alfon...@gmail.com> >>>> wrote: >>>> >> > >>>> >> > Hi, >>>> >> > >>>> >> > I've got some issues/misunderstanding of AVRO schema evolution. >>>> >> > >>>> >> > When reading through avro documentation, for example [1], I >>>> understood, that schema evolution is supported, and if I added column with >>>> specified default, it should be backwards compatible (and even forward when >>>> I remove it again). Sounds great, so I added column defined as: >>>> >> > >>>> >> > { >>>> >> > "name": "newColumn", >>>> >> > "type": ["null","string"], >>>> >> > "default": null, >>>> >> > "doc": "something wrong" >>>> >> > } >>>> >> > >>>> >> > and try to consumer some topic having this schema from beginning, >>>> it fails with message: >>>> >> > >>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5 >>>> >> > at >>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424) >>>> >> > at org.apache.avro.io >>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290) >>>> >> > at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) >>>> >> > at org.apache.avro.io >>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>> >> > at >>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83) >>>> >> > to give a little bit more information. Avro schema defines one top >>>> level type, having 2 fields. String describing type of message, and union >>>> of N types. All N-1, non-modified types can be read, but one updated with >>>> optional, default-having column cannot be read. I'm not sure if this design >>>> is strictly speaking correct, but that's not the point (feel free to >>>> criticise and recommend better approach!). I'm after schema evolution, >>>> which seems not to be working. >>>> >> > >>>> >> > >>>> >> > And if we alter type definition to: >>>> >> > >>>> >> > "type": "string", >>>> >> > "default": "" >>>> >> > it still does not work and generated error is: >>>> >> > >>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. >>>> Length is negative: -1 >>>> >> > at org.apache.avro.io >>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336) >>>> >> > at org.apache.avro.io >>>> .BinaryDecoder.readString(BinaryDecoder.java:263) >>>> >> > at org.apache.avro.io >>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>>> >> > at >>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>>> >> > at >>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83) >>>> >> > >>>> >> > Am I doing something wrong? >>>> >> > >>>> >> > thanks, >>>> >> > Martin. >>>> >> > >>>> >> > [1] >>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules >>>> >>>