I'm implementing an application which reads in structured data with an Avro 
schema applies some dynamically configurable transformations and outputs the 
data with Avro again. The problem I have is that for some transformations I 
need to modify the Avro schema. One transform could be for example that I read 
a value from a field apply some function to the value and write it back to a 
new field. In this scenario I need to add the new field to the output schema. I 
haven't found a really good way to do this with Avro. What I'm doing right now 
is reading all the fields from the old schema, create a new schema and copy all 
the fields over to this new schema:

Nicely formatted version: 
https://gist.github.com/paulsonnentag/201b8a13cba8ba91e384240cf26c63f1

// ...

// creating a new schema with the fields of the old schema added plus the new 
fields

val schema = // ... the schema of the input data

var newSchema = SchemaBuilder
      .builder(schema.getNamespace)
      .record(schema.getName)
      .fields()

    // create new schema with existing fields from schemas and new fields which 
are created through transforms
    val fields = schema.getFields ++ getNewFields(schema, transforms)

    fields
      .foldLeft(newSchema)((newSchema, field: Schema.Field) => {
        newSchema
          .name(field.name)
          .`type`(field.schema())
          .noDefault()
          // TODO: find way to differentiate between explicitly set null 
defaults and fields which have no default
          //.withDefault(field.defaultValue())
      })

     newSchema.endRecord()
   }



// ...



// create new fields like this

new Schema.Field(

  "addedField",

  Schema.createUnion(List(

    Schema.create(Schema.Type.STRING),

    Schema.create(Schema.Type.NULL)

  )),

  null,

  null

)

Any ideas how this could be done in a way that doesn't feel so hacky?

Thanks,
Paul

Reply via email to