Thanks for answer.

What I knew already is, that in each message there is _somehow_ present
either _some_ schema ID or full schema. I saw some byte array manipulations
to get _somehow_ defined schema ID from byte[], which worked, but that's
definitely not how it should be done. What I'm looking for is some
documentation of _how_ to do these things right. I really cannot find a
single thing, yet there must be some util functions, or anything. Is there
some devel-first-steps page, where can I find answers for:

* How to test, whether byte[] contains full schema or just id?
* How to control, whether message is serialized with ID or with full schema?
* how to get ID from byte[]?
* how to get full schema from byte[]?

I don't have confluent platform, and cannot have it, but implementing "get
schema by ID" should be easy task, provided, that I have that ID. In my
scenario I know, that message will be written using one schema, just
different versions of it. So I just need to know, which version it is, so
that I can configure deserializer to enable schema evolution.

thanks in advance,
Martin

čt 1. 8. 2019 v 15:55 odesílatel Svante Karlsson <svante.karls...@csi.se>
napsal:

> In an avrofile the schema is in the beginning but if you refer a single
> record serialization like Kafka then you have to add something that you can
> use to get hold of the schema. Confluents avroencoder for Kafka uses
> confluents schema registry that uses int32 as schema Id. This is prepended
> (+a magic byte) to the binary avro. Thus using the schema registry again
> you can get the writer schema.
>
> /Svante
>
> On Thu, Aug 1, 2019, 15:30 Martin Mucha <alfon...@gmail.com> wrote:
>
>> Hi,
>>
>> just one more question, not strictly related to the subject.
>>
>> Initially I though I'd be OK with using some initial version of schema in
>> place of writer schema. That works, but all columns from schema older than
>> this initial one would be just ignored. So I need to know EXACTLY the
>> schema, which writer used. I know, that avro messages contains either full
>> schema or at least it's ID. Can you point me to the documentation, where
>> this is discussed? So in my deserializer I have byte[] as a input, from
>> which I need to get the schema information first, in order to be able to
>> deserialize the record. I really do not know how to do that, I'm pretty
>> sure I never saw this anywhere, and I cannot find it anywhere. But in
>> principle it must be possible, since reader need not necessarily have any
>> control of which schema writer used.
>>
>> thanks a lot.
>> M.
>>
>> út 30. 7. 2019 v 18:16 odesílatel Martin Mucha <alfon...@gmail.com>
>> napsal:
>>
>>> Thank you very much for in depth answer. I understand how it works now
>>> better, will test it shortly.
>>> Thank you for your time.
>>>
>>> Martin.
>>>
>>> út 30. 7. 2019 v 17:09 odesílatel Ryan Skraba <r...@skraba.com> napsal:
>>>
>>>> Hello!  It's the same issue in your example code as allegro, even with
>>>> the SpecificDatumReader.
>>>>
>>>> This line: datumReader = new SpecificDatumReader<>(schema)
>>>> should be: datumReader = new SpecificDatumReader<>(originalSchema,
>>>> schema)
>>>>
>>>> In Avro, the original schema is commonly known as the writer schema
>>>> (the instance that originally wrote the binary data).  Schema
>>>> evolution applies when you are using the constructor of the
>>>> SpecificDatumReader that takes *both* reader and writer schemas.
>>>>
>>>> As a concrete example, if your original schema was:
>>>>
>>>> {
>>>>   "type": "record",
>>>>   "name": "Simple",
>>>>   "fields": [
>>>>     {"name": "id", "type": "int"},
>>>>     {"name": "name","type": "string"}
>>>>   ]
>>>> }
>>>>
>>>> And you added a field:
>>>>
>>>> {
>>>>   "type": "record",
>>>>   "name": "SimpleV2",
>>>>   "fields": [
>>>>     {"name": "id", "type": "int"},
>>>>     {"name": "name", "type": "string"},
>>>>     {"name": "description","type": ["null", "string"]}
>>>>   ]
>>>> }
>>>>
>>>> You could do the following safely, assuming that Simple and SimpleV2
>>>> classes are generated from the avro-maven-plugin:
>>>>
>>>> @Test
>>>> public void testSerializeDeserializeEvolution() throws IOException {
>>>>   // Write a Simple v1 to bytes using your exact method.
>>>>   byte[] v1AsBytes = serialize(new Simple(1, "name1"), true, false);
>>>>
>>>>   // Read as Simple v2, same as your method but with the writer and
>>>> reader schema.
>>>>   DatumReader<SimpleV2> datumReader =
>>>>       new SpecificDatumReader<>(Simple.getClassSchema(),
>>>> SimpleV2.getClassSchema());
>>>>   Decoder decoder = DecoderFactory.get().binaryDecoder(v1AsBytes, null);
>>>>   SimpleV2 v2 = datumReader.read(null, decoder);
>>>>
>>>>   assertThat(v2.getId(), is(1));
>>>>   assertThat(v2.getName(), is(new Utf8("name1")));
>>>>   assertThat(v2.getDescription(), nullValue());
>>>> }
>>>>
>>>> This demonstrates with two different schemas and SpecificRecords in
>>>> the same test, but the same principle applies if it's the same record
>>>> that has evolved -- you need to know the original schema that wrote
>>>> the data in order to apply the schema that you're now using for
>>>> reading.
>>>>
>>>> I hope this clarifies what you are looking for!
>>>>
>>>> All my best, Ryan
>>>>
>>>>
>>>>
>>>> On Tue, Jul 30, 2019 at 3:30 PM Martin Mucha <alfon...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Thanks for answer.
>>>> >
>>>> > Actually I have exactly the same behavior with avro 1.9.0 and
>>>> following deserializer in our other app, which uses strictly avro codebase,
>>>> and failing with same exceptions. So lets leave "allegro" library and lots
>>>> of other tools out of it in our discussion.
>>>> > I can use whichever aproach. All I need is single way, where I can
>>>> deserialize byte[] into class generated by avro-maven-plugin, and which
>>>> will respect documentation regarding schema evolution. Currently we're
>>>> using following deserializer and serializer, and these does not work when
>>>> it comes to schema evolution. What is the correct way to serialize and
>>>> deserializer avro data?
>>>> >
>>>> > I probably don't understand your mention about GenericRecord or
>>>> GenericDatumReader. I tried to use GenericDatumReader in deserializer
>>>> below, but then it seems I got back just GenericData$Record instance, which
>>>> I can use then to access array of instances, which is not what I'm looking
>>>> for(IIUC), since in that case I could have just use plain old JSON and
>>>> deserialize it using jackson having no schema evolution problems at all. If
>>>> that's correct, I'd rather stick to SpecificDatumReader, and somehow fix it
>>>> if possible.
>>>> >
>>>> > What can be done? Or how schema evolution is intended to be used? I
>>>> found a lots of question searching for this answer.
>>>> >
>>>> > thanks!
>>>> > Martin.
>>>> >
>>>> > deserializer:
>>>> >
>>>> > public static <T extends SpecificRecordBase> T deserialize(Class<T>
>>>> targetType,
>>>> >                                                                byte[]
>>>> data,
>>>> >
>>>> boolean useBinaryDecoder) {
>>>> >         try {
>>>> >             if (data == null) {
>>>> >                 return null;
>>>> >             }
>>>> >
>>>> >             log.trace("data='{}'",
>>>> DatatypeConverter.printHexBinary(data));
>>>> >
>>>> >             Schema schema = targetType.newInstance().getSchema();
>>>> >             DatumReader<GenericRecord> datumReader = new
>>>> SpecificDatumReader<>(schema);
>>>> >             Decoder decoder = useBinaryDecoder
>>>> >                     ? DecoderFactory.get().binaryDecoder(data, null)
>>>> >                     : DecoderFactory.get().jsonDecoder(schema, new
>>>> String(data));
>>>> >
>>>> >             T result = targetType.cast(datumReader.read(null,
>>>> decoder));
>>>> >             log.trace("deserialized data='{}'", result);
>>>> >             return result;
>>>> >         } catch (Exception ex) {
>>>> >             throw new SerializationException("Error deserializing
>>>> data", ex);
>>>> >         }
>>>> >     }
>>>> >
>>>> > serializer:
>>>> > public static <T extends SpecificRecordBase> byte[] serialize(T data,
>>>> boolean useBinaryDecoder, boolean pretty) {
>>>> >         try {
>>>> >             if (data == null) {
>>>> >                 return new byte[0];
>>>> >             }
>>>> >
>>>> >             log.debug("data='{}'", data);
>>>> >             Schema schema = data.getSchema();
>>>> >             ByteArrayOutputStream byteArrayOutputStream = new
>>>> ByteArrayOutputStream();
>>>> >             Encoder binaryEncoder = useBinaryDecoder
>>>> >                     ?
>>>> EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null)
>>>> >                     : EncoderFactory.get().jsonEncoder(schema,
>>>> byteArrayOutputStream, pretty);
>>>> >
>>>> >             DatumWriter<GenericRecord> datumWriter = new
>>>> GenericDatumWriter<>(schema);
>>>> >             datumWriter.write(data, binaryEncoder);
>>>> >
>>>> >             binaryEncoder.flush();
>>>> >             byteArrayOutputStream.close();
>>>> >
>>>> >             byte[] result = byteArrayOutputStream.toByteArray();
>>>> >             log.debug("serialized data='{}'",
>>>> DatatypeConverter.printHexBinary(result));
>>>> >             return result;
>>>> >         } catch (IOException ex) {
>>>> >             throw new SerializationException(
>>>> >                     "Can't serialize data='" + data, ex);
>>>> >         }
>>>> >     }
>>>> >
>>>> > út 30. 7. 2019 v 13:48 odesílatel Ryan Skraba <r...@skraba.com>
>>>> napsal:
>>>> >>
>>>> >> Hello!  Schema evolution relies on both the writer and reader schemas
>>>> >> being available.
>>>> >>
>>>> >> It looks like the allegro tool you are using is using the
>>>> >> GenericDatumReader that assumes the reader and writer schema are the
>>>> >> same:
>>>> >>
>>>> >>
>>>> https://github.com/allegro/json-avro-converter/blob/json-avro-converter-0.2.8/converter/src/main/java/tech/allegro/schema/json2avro/converter/JsonAvroConverter.java#L83
>>>> >>
>>>> >> I do not believe that the "default" value is taken into account for
>>>> >> data that is strictly missing from the binary input, just when a
>>>> field
>>>> >> is known to be in the reader schema but missing from the original
>>>> >> writer.
>>>> >>
>>>> >> You may have more luck reading the GenericRecord with a
>>>> >> GenericDatumReader with both schemas, and using the
>>>> >> `convertToJson(record)`.
>>>> >>
>>>> >> I hope this is useful -- Ryan
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Tue, Jul 30, 2019 at 10:20 AM Martin Mucha <alfon...@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Hi,
>>>> >> >
>>>> >> > I've got some issues/misunderstanding of AVRO schema evolution.
>>>> >> >
>>>> >> > When reading through avro documentation, for example [1], I
>>>> understood, that schema evolution is supported, and if I added column with
>>>> specified default, it should be backwards compatible (and even forward when
>>>> I remove it again). Sounds great, so I added column defined as:
>>>> >> >
>>>> >> >         {
>>>> >> >           "name": "newColumn",
>>>> >> >           "type": ["null","string"],
>>>> >> >           "default": null,
>>>> >> >           "doc": "something wrong"
>>>> >> >         }
>>>> >> >
>>>> >> > and try to consumer some topic having this schema from beginning,
>>>> it fails with message:
>>>> >> >
>>>> >> > Caused by: java.lang.ArrayIndexOutOfBoundsException: 5
>>>> >> >     at
>>>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
>>>> >> >     at org.apache.avro.io
>>>> .ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>>>> >> >     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>>>> >> >     at org.apache.avro.io
>>>> .ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>> >> >     at
>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>> >> > to give a little bit more information. Avro schema defines one top
>>>> level type, having 2 fields. String describing type of message, and union
>>>> of N types. All N-1, non-modified types can be read, but one updated with
>>>> optional, default-having column cannot be read. I'm not sure if this design
>>>> is strictly speaking correct, but that's not the point (feel free to
>>>> criticise and recommend better approach!). I'm after schema evolution,
>>>> which seems not to be working.
>>>> >> >
>>>> >> >
>>>> >> > And if we alter type definition to:
>>>> >> >
>>>> >> > "type": "string",
>>>> >> > "default": ""
>>>> >> > it still does not work and generated error is:
>>>> >> >
>>>> >> > Caused by: org.apache.avro.AvroRuntimeException: Malformed data.
>>>> Length is negative: -1
>>>> >> >     at org.apache.avro.io
>>>> .BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
>>>> >> >     at org.apache.avro.io
>>>> .BinaryDecoder.readString(BinaryDecoder.java:263)
>>>> >> >     at org.apache.avro.io
>>>> .ResolvingDecoder.readString(ResolvingDecoder.java:201)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>>>> >> >     at
>>>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>>>> >> >     at
>>>> tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToJson(JsonAvroConverter.java:83)
>>>> >> >
>>>> >> > Am I doing something wrong?
>>>> >> >
>>>> >> > thanks,
>>>> >> > Martin.
>>>> >> >
>>>> >> > [1]
>>>> https://docs.oracle.com/database/nosql-12.1.3.4/GettingStartedGuide/schemaevolution.html#changeschema-rules
>>>>
>>>

Reply via email to