shirshanka commented on a change in pull request #2806: GOBBLIN-957: Add
recursion eliminating converter for Avro
URL: https://github.com/apache/incubator-gobblin/pull/2806#discussion_r346610906
##########
File path: gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
##########
@@ -935,4 +941,141 @@ public static Schema overrideNameAndNamespace(Schema
input, String nameOverride,
return newSchema;
}
+ @Builder
+ @ToString
+ public static class SchemaEntry {
+ @Getter
+ final String fieldName;
+ final Schema schema;
+ String fullyQualifiedType() {
+ return schema.getFullName();
+ }
+ }
+
+ /**
+ * Drop recursive fields from a Schema. Recursive fields are fields that
refer to types that are part of the
+ * parent tree.
+ * e.g. consider this Schema for a User
+ * {
+ * "type": "record",
+ * "name": "User",
+ * "fields": [
+ * {"name": "name", "type": "string",
+ * {"name": "friend", "type": "User"}
+ * ]
+ * }
+ * the friend field is a recursive field. After recursion has been
eliminated we expect the output Schema to look like
+ * {
+ * "type": "record",
+ * "name": "User",
+ * "fields": [
+ * {"name": "name", "type": "string"}
+ * ]
+ * }
+ *
+ * @param schema
+ * @return a Pair of (The transformed schema with recursion eliminated, A
list of @link{SchemaEntry} objects which
+ * represent the fields that were removed from the original schema)
+ */
+ public static Pair<Schema, List<SchemaEntry>> dropRecursiveFields(Schema
schema) {
+ List<SchemaEntry> recursiveFields = new ArrayList<>();
+ return new Pair(dropRecursive(new SchemaEntry(null, schema),
Collections.EMPTY_LIST, recursiveFields),
+ recursiveFields);
+ }
+
+ /**
+ * Inner recursive method called by {@link #dropRecursiveFields(Schema)}
+ * @param schemaEntry
+ * @param parents
+ * @param fieldsWithRecursion
+ * @return the transformed Schema, null if schema is recursive w.r.t parent
schema traversed so far
+ */
+ private static Schema dropRecursive(SchemaEntry schemaEntry,
List<SchemaEntry> parents, List<SchemaEntry> fieldsWithRecursion) {
+ Schema schema = schemaEntry.schema;
+ // ignore primitive fields
+ switch (schema.getType()) {
+ case UNION:{
+ List<Schema> unionTypes = schema.getTypes();
+ List<Schema> copiedUnionTypes = new ArrayList<Schema>();
+ for (Schema unionSchema: unionTypes) {
+ SchemaEntry unionSchemaEntry = new SchemaEntry(
+ schemaEntry.fieldName, unionSchema);
+ copiedUnionTypes.add(dropRecursive(unionSchemaEntry, parents,
fieldsWithRecursion));
+ }
+ if (copiedUnionTypes.stream().anyMatch(x -> x == null)) {
+ // one or more types in the union are referring to a parent type
(directly recursive),
+ // entire union must be dropped
+ return null;
+ }
+ else {
+ Schema copySchema = Schema.createUnion(copiedUnionTypes);
+ copyProperties(schema, copySchema);
+ return copySchema;
+ }
+ }
+ case RECORD:{
+ // check if the type of this schema matches any in the parents list
+ if (parents.stream().anyMatch(parent ->
parent.fullyQualifiedType().equals(schemaEntry.fullyQualifiedType()))) {
+ fieldsWithRecursion.add(schemaEntry);
+ return null;
+ }
+ List<SchemaEntry> newParents = new ArrayList<>(parents);
+ newParents.add(schemaEntry);
+ List<Schema.Field> copiedSchemaFields = new ArrayList<>();
+ for (Schema.Field field: schema.getFields()) {
+ String fieldName = schemaEntry.fieldName != null ?
schemaEntry.fieldName + "." + field.name() : field.name();
+ SchemaEntry fieldSchemaEntry = new SchemaEntry(fieldName,
field.schema());
+ Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry,
newParents, fieldsWithRecursion);
+ if (copiedFieldSchema == null) {
+ } else {
+ Schema.Field copiedField =
+ new Schema.Field(field.name(), copiedFieldSchema, field.doc(),
field.defaultValue(), field.order());
+ copyFieldProperties(field, copiedField);
+ copiedSchemaFields.add(copiedField);
+ }
+ }
+ if (copiedSchemaFields.size() > 0) {
+ Schema copiedRecord = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(),
+ schema.isError());
+ copiedRecord.setFields(copiedSchemaFields);
+ copyProperties(schema, copiedRecord);
+ return copiedRecord;
+ } else {
+ return null;
+ }
+ }
+ case ARRAY: {
+ Schema itemSchema = schema.getElementType();
+ SchemaEntry itemSchemaEntry = new SchemaEntry(schemaEntry.fieldName,
itemSchema);
+ Schema copiedItemSchema = dropRecursive(itemSchemaEntry, parents,
fieldsWithRecursion);
+ if (copiedItemSchema == null) {
+ return null;
+ } else {
+ Schema copiedArraySchema = Schema.createArray(copiedItemSchema);
+ copyProperties(schema, copiedArraySchema);
+ return copiedArraySchema;
+ }
+ }
+ case MAP: {
+ Schema valueSchema = schema.getValueType();
+ SchemaEntry valueSchemaEntry = new SchemaEntry(schemaEntry.fieldName,
valueSchema);
+ Schema copiedValueSchema = dropRecursive(valueSchemaEntry, parents,
fieldsWithRecursion);
+ if (copiedValueSchema == null) {
+ return null;
+ } else {
+ Schema copiedMapSchema = Schema.createMap(copiedValueSchema);
+ copyProperties(schema, copiedMapSchema);
+ return copiedMapSchema;
+ }
+ }
+ default: {
+ return schema;
+ }
+ }
+ }
+
+ private static void copyFieldProperties(Schema.Field field, Schema.Field
copiedField) {
Review comment:
those are not copyable... they need to be provided when you create the
field.
You'll see that in the code above where I do:
Schema.Field copiedField =
new Schema.Field(field.name(), copiedFieldSchema,
field.doc(), field.defaultValue(), field.order());
Will add javadoc on this method to make this clearer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services