I am still curious regarding
Throws UnresolvedUnionException ```Caused by:
org.apache.avro.UnresolvedUnionException: Not in union
["null",{"type":"record","name"...
at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
at
org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
```
the copy exeption since I have since resolved creating the record with a Schema.
Thanks,
Colin
On Mon, Sep 21, 2020 at 10:46 PM Mika Ristimaki
<[email protected]> wrote:
>
> Oh you are running this in Spark. Even the newest version of Spark Core
> (3.0.1) has dependency to Avro 1.8.2. And the copy constructor that Ryan
> was talking about was added in Avro 1.9. Although I don't understand how this
>
> List<Schema.Field> clonedFields = existingFields.stream()
> .map(f -> new Schema.Field(f, f.schema()))
> .collect(Collectors.toList());
>
> throws NPE since it shouldn't even compile, because such constructor
> shouldn't exist.
>
> -Mika
>
> On Sep 22 2020, at 7:59 am, Colin Williams
> <[email protected]> wrote:
>
> > *I tried your copy also and got the same UnresolvedUnionException*
> >
> > On Mon, Sep 21, 2020 at 9:57 PM Colin Williams
> > <[email protected]> wrote:
> >>
> >> Hi, Mika
> >>
> >> Sorry I wasn't more clear in my previous emails. For the time since
> >> after my first email because I was getting a NPE when working with the
> >> Schema, I was able to fetch the Schema elsewhere and apply it with
> >>
> >> final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> >>
> >> Then I put aside that particular issue for the time being.
> >>
> >>
> >> Then I provided part of the related exception when attempting to write
> >> the copy. Unfortunately I can't currently provide the schema but maybe
> >> I will figure out a way to provide a reproduction eventually.
> >>
> >> E.G. Throws UnresolvedUnionException ```Caused by:
> >> org.apache.avro.UnresolvedUnionException: Not in union
> >> ["null",{"type":"record","name"...
> >> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:853)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:249)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:142)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> >> at
> >> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> >> ```
> >>
> >> I tried your copy also and got an NPE. I thought that it might be
> >> somewhat obvious from the exception message and details what is
> >> causing the exception.
> >>
> >> It might be that I can't do this within a Map operation in a
> >> distributed platform like Spark?
> >>
> >>
> >> Thanks and Best Regards,
> >>
> >> Colin
> >>
> >>
> >>
> >> On Mon, Sep 21, 2020 at 6:22 AM Mika Ristimaki
> >> <[email protected]> wrote:
> >> >
> >> > Hi Colin,
> >> >
> >> > You are omitting some details from your mail (such as the actual schema)
> >> > but I am suspecting in your first email you tried to do something
> >> like this
> >> >
> >> > Schema SCHEMA = SchemaBuilder
> >> > .record("test")
> >> > .namespace("foo")
> >> > .fields()
> >> > .name("foo").type().stringType().noDefault()
> >> > .name("bar").type().optional().stringType()
> >> > .endRecord();
> >> >
> >> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> >> > castRecord.put("foo", "foo value");
> >> > castRecord.put("bar", "bar value");
> >> >
> >> > Schema clonedSchema = Schema.createRecord("othertest",
> >> > "",
> >> > "bar",
> >> > false,
> >> > SCHEMA.getFields());
> >> >
> >> > And this didn't work because as Ryan explained schema fields cannot be
> >> > reused. So he suggested that the fields should be copied first.
> >> >
> >> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >> > .map(f -> new Schema.Field(f, f.schema()))
> >> > .collect(Collectors.toList());
> >> >
> >> > I tested it and it works fine. You should use it like this:
> >> >
> >> > Schema SCHEMA = SchemaBuilder
> >> > .record("test")
> >> > .namespace("foo")
> >> > .fields()
> >> > .name("foo").type().stringType().noDefault()
> >> > .name("bar").type().optional().stringType()
> >> > .endRecord();
> >> >
> >> > GenericRecord castRecord = new GenericData.Record(SCHEMA);
> >> > castRecord.put("foo", "foo value");
> >> > castRecord.put("bar", "bar value");
> >> >
> >> > List<Schema.Field> clonedFields = SCHEMA.getFields().stream()
> >> > .map(f -> new Schema.Field(f, f.schema()))
> >> > .collect(Collectors.toList());
> >> > Schema clonedSchema = Schema.createRecord("othertest",
> >> > "",
> >> > "bar",
> >> > false,
> >> > clonedFields);
> >> >
> >> > final GenericRecord clonedRecord = new GenericData.Record(clonedSchema);
> >> > for (int i = 0; i < clonedFields.size(); i++) {
> >> > clonedRecord.put(i, castRecord.get(i));
> >> > }
> >> >
> >> > GenericDatumWriter<GenericRecord> writer = new
> >> > GenericDatumWriter<GenericRecord>(SCHEMA);
> >> > ByteArrayOutputStream clonedRecordOutputStream = new
> >> > ByteArrayOutputStream();
> >> > Encoder binaryEncoder =
> >> >
> >> EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> >> > writer.write(clonedRecord, binaryEncoder);
> >> >
> >> > Also for more robustness against reordered fields in clonedSchema, you
> >> > should probably do the data copying something like this:
> >> >
> >> > for (Schema.Field field : castRecord.getSchema().getFields()) {
> >> > clonedRecord.put(field.name(), castRecord.get(field.name()));
> >> > }
> >> >
> >> >
> >> > On Sep 20 2020, at 7:33 am, Colin Williams
> >> > <[email protected]> wrote:
> >> >
> >> > > I found a way to get my record schema so I'm no longer throwing the
> >> > > same exception. I am now able to put the record indexes for the cloned
> >> > > record.
> >> > >
> >> > > GenericRecord castRecord = (GenericRecord) input.getPayloadObject();
> >> > > final GenericRecord clonedRecord = new GenericData.Record(SCHEMA);
> >> > > clonedRecord.getSchema().getFields();
> >> > > int length = SCHEMA.getFields().size();
> >> > > for (int i = 0; i < length; i++) {
> >> > > clonedRecord.put(i, castRecord.get(i));
> >> > > }
> >> > >
> >> > > GenericDatumWriter<GenericRecord> writer = new
> >> > > GenericDatumWriter<GenericRecord>(SCHEMA);
> >> > > ByteArrayOutputStream clonedRecordOutputStream = new
> >> > > ByteArrayOutputStream();
> >> > > Encoder binaryEncoder =
> >> > > EncoderFactory.get().binaryEncoder(clonedRecordOutputStream, null);
> >> > > writer.write(clonedRecord, binaryEncoder);
> >> > >
> >> > > ^^ Throws UnresolvedUnionException ```Caused by:
> >> > > org.apache.avro.UnresolvedUnionException: Not in union
> >> > > ["null",{"type":"record","name"```
> >> > >
> >> > > The schema is identical between the records, with the exception
> >> of the
> >> > > name and namespace. Then I don't understand why I'm getting the
> >> > > exception. I can write with the castRecord and it's schema. Is
> >> this a
> >> > > byte alignment issue caused by the name and namespace? I didn't see
> >> > > them in the record indexes.
> >> > >
> >> > >
> >> > > On Sat, Sep 19, 2020 at 3:04 PM Colin Williams
> >> > > <[email protected]> wrote:
> >> > >>
> >> > >> I gave it a shot but from a MapFunction that supposed to be
> >> > >> serializable. I got an NPE.
> >> > >>
> >> > >> I assume I can't create a new Schema.Field from within that
> >> > >> MapFunction . I didn't seem to have trouble accessing the existing
> >> > >> schema fields.
> >> > >>
> >> > >> > List<Schema.Field> clonedFields = existingFields.stream()
> >> > >> .map(f -> new Schema.Field(f, f.schema()))
> >> > >> .collect(Collectors.toList());
> >> > >>
> >> > >> On Fri, Sep 18, 2020 at 7:09 PM Colin Williams
> >> > >> <[email protected]> wrote:
> >> > >> >
> >> > >> > Hi Ryan, Thanks for your explanation. I am thinking now that the
> >> > >> > design of AVRO suggests that data and schemas are very planned
> >> > >> > things.
> >> > >> > That changes are planned through versioning and we don't like
> >> > >> > duplicated schemas (when the positioning makes sense).
> >> > >> >
> >> > >> > I have a round about way of learning. Sometimes I am working
> >> with data
> >> > >> > and I think it's convenient to transform my data
> >> programmatically and
> >> > >> > try to obtain a schema from that. Also I think that schemas
> >> can become
> >> > >> > cumbersome when many fields are involved in intricate patterns.
> >> > >> >
> >> > >> > I think maybe there are other forms maybe more well suited for that.
> >> > >> >
> >> > >> > Regarding your proposals 1,2 seem reasonable to me. But
> >> someone like
> >> > >> > myself might also not fully understand the design of AVRO.
> >> > >> > A better exception or some kind of lead for armchair
> >> programmers to
> >> > >> > better understand the exception. Thanks for mentioning the copy
> >> > >> > operation.
> >> > >> >
> >> > >> > Finally I do see something about aliases.
> >> > >> >
> >> > >> > Thanks,
> >> > >> >
> >> > >> > Colin
> >> > >> >
> >> > >> >
> >> > >> >
> >> > >> > On Fri, Sep 18, 2020 at 5:32 AM Ryan Skraba <[email protected]> wrote:
> >> > >> > >
> >> > >> > > Hello Colin, you've hit one bit of fussiness with the Java
> >> SDK... you
> >> > >> > > can't reuse a Schema.Field object in two Records, because a field
> >> > >> > > knows its own position in the record[1]. If a field were to
> >> > >> belong to
> >> > >> > > two records at different positions, this method would have an
> >> > >> > > ambiguous response.
> >> > >> > >
> >> > >> > > As a workaround, since Avro 1.9, there's a copy constructor
> >> that you
> >> > >> > > can use to clone the field:
> >> > >> > >
> >> > >> > > List<Schema.Field> clonedFields = existingFields.stream()
> >> > >> > > .map(f -> new Schema.Field(f, f.schema()))
> >> > >> > > .collect(Collectors.toList());
> >> > >> > >
> >> > >> > > That being said, I don't see any reason we MUST throw an
> >> > >> > > exception.
> >> > >> > > There's a couple of alternative strategies we could use in
> >> the Java
> >> > >> > > SDK:
> >> > >> > >
> >> > >> > > 1. If the position is the same in both records, allow the field
> >> > >> to be
> >> > >> > > reused (which enables cloning use cases).
> >> > >> > >
> >> > >> > > 2. Make a copy of the field to reuse internally if the
> >> position is
> >> > >> > > already set (probably OK, since it's supposed to be immutable).
> >> > >> > >
> >> > >> > > 3. Allow the field to be reused, only throw the exception
> >> only if
> >> > >> > > someone calls the position() method later.
> >> > >> > >
> >> > >> > > Any of those sound like a useful change for your use case? Don't
> >> > >> > > hesitate to create a JIRA or contribution if you like!
> >> > >> > >
> >> > >> > > All my best, Ryan
> >> > >> > >
> >> > >> > > On Fri, Sep 18, 2020 at 8:27 AM Colin Williams
> >> > >> > > <[email protected]> wrote:
> >> > >> > > >
> >> > >> > > > Hello,
> >> > >> > > >
> >> > >> > > > I'm trying to understand working with Avro records and schemas,
> >> > >> > > > programmatically. Then I was first trying to create a new
> >> > >> schema and
> >> > >> > > > records based on existing records, but with a different
> >> name /
> >> > >> > > > namespace. It seems then I don't understand getFields() or
> >> > >> > > > createRecord(...). Why can't I use the fields obtained from
> >> > >> > > > getFields() in createRecord()? How would I go about this
> >> > >> > > > properly?
> >> > >> > > >
> >> > >> > > > // for an existing record already present
> >> > >> > > > GenericRecord someRecord
> >> > >> > > >
> >> > >> > > > // get a list of existing fields
> >> > >> > > > List<Schema.Field> existingFields =
> >> > >> > > > someRecord.getSchema().getFields();
> >> > >> > > >
> >> > >> > > > // schema for new record with existing fields
> >> > >> > > > Schema updatedSchema = createRecord("UpdatedName",
> >> > >> > > > "","avro.com.example.namespace" , false, existingFields);
> >> > >> > > >
> >> > >> > > > ^^ throws an exception ^^
> >> > >> > > >
> >> > >> > > > /* Caused by: org.apache.avro.AvroRuntimeException: Field
> >> > >> > > > already
> >> > >> > > > used: eventMetadata type:UNION pos:0
> >> > >> > > > at
> >> > >> > > > org.apache.avro.Schema$RecordSchema.setFields(Schema.java:888)
> >> > >> > > > at org.apache.avro.Schema$RecordSchema.<init>(Schema.java:856)
> >> > >> > > > at org.apache.avro.Schema.createRecord(Schema.java:217)
> >> > >> > > > */
> >> > >> > > >
> >> > >> > > > final int length = fields.size();
> >> > >> > > >
> >> > >> > > > GenericRecord clonedRecord = new
> >> > >> > > > GenericData.Record(updatedSchema);
> >> > >> > > > for (int i = 0; i < length; i++) {
> >> > >> > > > final Schema.Field field = existingFields.get(i);
> >> > >> > > > clonedRecord.put(i, someRecord.get(i));
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > Best Regards,
> >> > >> > > >
> >> > >> > > > Colin Williams
> >> > >
> >