alexeykudinkin commented on code in PR #6806: URL: https://github.com/apache/hudi/pull/6806#discussion_r981654227
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java: ########## @@ -45,10 +45,15 @@ public static class Config { .sinceVersion("0.13.0") .withDocumentation("The Protobuf Message class used as the source for the schema."); - public static final ConfigProperty<Boolean> PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers") + public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers") .defaultValue(false) .sinceVersion("0.13.0") - .withDocumentation("When set to false wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value"); + .withDocumentation("When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value"); Review Comment: Let's call out what default behavior is (otherwise reader needs to do double negation to realize) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java: ########## @@ -45,10 +45,15 @@ public static class Config { .sinceVersion("0.13.0") .withDocumentation("The Protobuf Message class used as the source for the schema."); - public static final ConfigProperty<Boolean> PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers") + public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers") .defaultValue(false) .sinceVersion("0.13.0") - .withDocumentation("When set to false wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value"); + .withDocumentation("When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field instead of simply a nullable value"); + + public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".timestamps.as.records") + .defaultValue(false) + .sinceVersion("0.13.0") + .withDocumentation("When set to true Timestamp fields are translated to a record with a seconds and nanos field, instead of a long with the timestamp-micros logical type"); Review Comment: Same here ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -58,12 +65,13 @@ public class ProtoConversionUtil { /** * Creates an Avro {@link Schema} for the provided class. Assumes that the class is a protobuf {@link Message}. * @param clazz The protobuf class - * @param flattenWrappedPrimitives set to true to treat wrapped primitives like nullable fields instead of nested messages. + * @param wrappedPrimitivesAsRecords set to true to treat wrapped primitives like record with a single "value" field instead of simply a nullable field * @param maxRecursionDepth the number of times to unravel a recursive proto schema before spilling the rest to bytes + * @param timestampsAsRecords if true convert {@link Timestamp} to a Record with a seconds and nanos field, otherwise convert protobuf {@link Timestamp} to a long with the time-mircos logical type. * @return An Avro schema */ - public static Schema getAvroSchemaForMessageClass(Class clazz, boolean flattenWrappedPrimitives, int maxRecursionDepth) { - return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives, maxRecursionDepth); + public static Schema getAvroSchemaForMessageClass(Class clazz, boolean wrappedPrimitivesAsRecords, int maxRecursionDepth, boolean timestampsAsRecords) { + return AvroSupport.get().getSchema(clazz, wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords); Review Comment: Similar comment: instead of passing these as params: - Make `AvroSupport` non-singleton - Pass config into `AvroSupport` and init these as fields in there ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -204,34 +226,48 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor f, CopyOnWriteMap<Desc return schemaFinalizer.apply(Schema.create(Schema.Type.INT)); case UINT32: case INT64: - case UINT64: case SINT64: case FIXED64: case SFIXED64: return schemaFinalizer.apply(Schema.create(Schema.Type.LONG)); + case UINT64: + return schemaFinalizer.apply(UNSIGNED_LONG_SCHEMA); case MESSAGE: - String updatedPath = appendFieldNameToPath(path, f.getName()); - if (flattenWrappedPrimitives && WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) { + String updatedPath = appendFieldNameToPath(path, fieldDescriptor.getName()); + if (!wrappedPrimitivesAsRecords && WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) { // all wrapper types have a single field, so we can get the first field in the message's schema - return schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL_SCHEMA, getFieldSchema(f.getMessageType().getFields().get(0), recursionDepths, flattenWrappedPrimitives, updatedPath, - maxRecursionDepth)))); + return schemaFinalizer.apply(makeSchemaNullable(getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0), recursionDepths, wrappedPrimitivesAsRecords, updatedPath, + maxRecursionDepth, timestampsAsRecords))); + } + if (!timestampsAsRecords && Timestamp.getDescriptor().equals(fieldDescriptor.getMessageType())) { + // Handle timestamps as long with logical type + return schemaFinalizer.apply(makeSchemaNullable(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))); Review Comment: Same comment as above ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) { case ENUM: return GenericData.get().createEnum(value.toString(), schema); case FIXED: - return GenericData.get().createFixed(null, ((GenericFixed) value).bytes(), schema); + if (value instanceof byte[]) { + return GenericData.get().createFixed(null, (byte[]) value, schema); + } + Object unsignedLongValue = value; + if (unsignedLongValue instanceof Message) { + // Unwrap UInt64Value + unsignedLongValue = getWrappedValue(unsignedLongValue); + } + // convert the long to its unsigned value + return DECIMAL_CONVERSION.toFixed(new BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema, schema.getLogicalType()); Review Comment: There's `BigDecimal.valueOf` we can use ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java: ########## @@ -130,10 +141,43 @@ public void recursiveSchema_withOverflow() throws Exception { Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(), parsedChildren2Overflow); } + @Test + public void oneOfSchema() throws IOException { + Schema.Parser parser = new Schema.Parser(); + Schema convertedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/oneof_schema.avsc")); + WithOneOf input = WithOneOf.newBuilder().setLong(32L).build(); + GenericRecord actual = serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, input), convertedSchema); + + GenericData.Record expectedRecord = new GenericData.Record(convertedSchema); + expectedRecord.put("int", null); + expectedRecord.put("long", 32L); + expectedRecord.put("message", null); + Assertions.assertEquals(expectedRecord, actual); + } + + private void assertUnsignedLongCorrectness(Schema convertedSchema, Sample input, GenericRecord actual, boolean wellKnownTypesAsRecords) { Review Comment: Good indicator of whether method is coherent is how easy it to judge what it does by simply looking at its signature. For this one it's not that easy. I'd suggest to split it actually in 2 and instead make them like following: ``` assert(Schema.Field fieldSchema, Long expectedValue, ... actual, ...) ``` ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -85,6 +93,8 @@ public static GenericRecord convertToAvro(Schema schema, Message message) { private static class AvroSupport { private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); + private static final Schema UNSIGNED_LONG_SCHEMA = LogicalTypes.decimal(21).addToSchema(Schema.createFixed("unsigned_long", null, "org.apache.hudi.protos", 9)); Review Comment: Please add a comment explaining where 21 is coming from ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor descriptor, CopyOnWriteMa List<Schema.Field> fields = new ArrayList<>(descriptor.getFields().size()); for (Descriptors.FieldDescriptor f : descriptor.getFields()) { // each branch of the schema traversal requires its own recursion depth tracking so copy the recursionDepths map - fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, maxRecursionDepth), null, getDefault(f))); + fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path, maxRecursionDepth, timestampsAsRecords), + null, getDefault(f))); } result.setFields(fields); return result; } - private Schema getFieldSchema(Descriptors.FieldDescriptor f, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean flattenWrappedPrimitives, String path, - int maxRecursionDepth) { - Function<Schema, Schema> schemaFinalizer = f.isRepeated() ? Schema::createArray : Function.identity(); - switch (f.getType()) { + private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean wrappedPrimitivesAsRecords, String path, + int maxRecursionDepth, boolean timestampsAsRecords) { + Function<Schema, Schema> schemaFinalizer = schema -> { + Schema updatedSchema = schema; + // all fields in the oneof will be treated as nullable + if (fieldDescriptor.getContainingOneof() != null && !(schema.getType() == Schema.Type.UNION && schema.getTypes().get(0).getType() == Schema.Type.NULL)) { Review Comment: Should we do an assertion instead making sure that passed in schema is nullable? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -58,12 +65,13 @@ public class ProtoConversionUtil { /** * Creates an Avro {@link Schema} for the provided class. Assumes that the class is a protobuf {@link Message}. * @param clazz The protobuf class - * @param flattenWrappedPrimitives set to true to treat wrapped primitives like nullable fields instead of nested messages. + * @param wrappedPrimitivesAsRecords set to true to treat wrapped primitives like record with a single "value" field instead of simply a nullable field * @param maxRecursionDepth the number of times to unravel a recursive proto schema before spilling the rest to bytes + * @param timestampsAsRecords if true convert {@link Timestamp} to a Record with a seconds and nanos field, otherwise convert protobuf {@link Timestamp} to a long with the time-mircos logical type. * @return An Avro schema */ - public static Schema getAvroSchemaForMessageClass(Class clazz, boolean flattenWrappedPrimitives, int maxRecursionDepth) { - return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives, maxRecursionDepth); + public static Schema getAvroSchemaForMessageClass(Class clazz, boolean wrappedPrimitivesAsRecords, int maxRecursionDepth, boolean timestampsAsRecords) { Review Comment: Let's pass in config in here instead of individual values (it's not gonna scale well) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -204,34 +226,48 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor f, CopyOnWriteMap<Desc return schemaFinalizer.apply(Schema.create(Schema.Type.INT)); case UINT32: case INT64: - case UINT64: case SINT64: case FIXED64: case SFIXED64: return schemaFinalizer.apply(Schema.create(Schema.Type.LONG)); + case UINT64: + return schemaFinalizer.apply(UNSIGNED_LONG_SCHEMA); case MESSAGE: - String updatedPath = appendFieldNameToPath(path, f.getName()); - if (flattenWrappedPrimitives && WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) { + String updatedPath = appendFieldNameToPath(path, fieldDescriptor.getName()); + if (!wrappedPrimitivesAsRecords && WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) { // all wrapper types have a single field, so we can get the first field in the message's schema - return schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL_SCHEMA, getFieldSchema(f.getMessageType().getFields().get(0), recursionDepths, flattenWrappedPrimitives, updatedPath, - maxRecursionDepth)))); + return schemaFinalizer.apply(makeSchemaNullable(getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0), recursionDepths, wrappedPrimitivesAsRecords, updatedPath, Review Comment: Let's break this expression up (for readability, extracting val for schema) ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor descriptor, CopyOnWriteMa List<Schema.Field> fields = new ArrayList<>(descriptor.getFields().size()); for (Descriptors.FieldDescriptor f : descriptor.getFields()) { // each branch of the schema traversal requires its own recursion depth tracking so copy the recursionDepths map - fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, maxRecursionDepth), null, getDefault(f))); + fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path, maxRecursionDepth, timestampsAsRecords), + null, getDefault(f))); } result.setFields(fields); return result; } - private Schema getFieldSchema(Descriptors.FieldDescriptor f, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean flattenWrappedPrimitives, String path, - int maxRecursionDepth) { - Function<Schema, Schema> schemaFinalizer = f.isRepeated() ? Schema::createArray : Function.identity(); - switch (f.getType()) { + private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean wrappedPrimitivesAsRecords, String path, + int maxRecursionDepth, boolean timestampsAsRecords) { + Function<Schema, Schema> schemaFinalizer = schema -> { + Schema updatedSchema = schema; + // all fields in the oneof will be treated as nullable + if (fieldDescriptor.getContainingOneof() != null && !(schema.getType() == Schema.Type.UNION && schema.getTypes().get(0).getType() == Schema.Type.NULL)) { Review Comment: Let's extract this finalizer as a static method to avoid gotchas w/ closures ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor descriptor, CopyOnWriteMa List<Schema.Field> fields = new ArrayList<>(descriptor.getFields().size()); for (Descriptors.FieldDescriptor f : descriptor.getFields()) { // each branch of the schema traversal requires its own recursion depth tracking so copy the recursionDepths map - fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, maxRecursionDepth), null, getDefault(f))); + fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path, maxRecursionDepth, timestampsAsRecords), + null, getDefault(f))); } result.setFields(fields); return result; } - private Schema getFieldSchema(Descriptors.FieldDescriptor f, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean flattenWrappedPrimitives, String path, - int maxRecursionDepth) { - Function<Schema, Schema> schemaFinalizer = f.isRepeated() ? Schema::createArray : Function.identity(); - switch (f.getType()) { + private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor, CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean wrappedPrimitivesAsRecords, String path, + int maxRecursionDepth, boolean timestampsAsRecords) { + Function<Schema, Schema> schemaFinalizer = schema -> { + Schema updatedSchema = schema; + // all fields in the oneof will be treated as nullable + if (fieldDescriptor.getContainingOneof() != null && !(schema.getType() == Schema.Type.UNION && schema.getTypes().get(0).getType() == Schema.Type.NULL)) { + updatedSchema = makeSchemaNullable(schema); + } + if (fieldDescriptor.isRepeated()) { Review Comment: Order of these conditionals should be reversed (repeated field w/in oneof). Let's add a test for that ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) { case ENUM: return GenericData.get().createEnum(value.toString(), schema); case FIXED: - return GenericData.get().createFixed(null, ((GenericFixed) value).bytes(), schema); + if (value instanceof byte[]) { + return GenericData.get().createFixed(null, (byte[]) value, schema); + } + Object unsignedLongValue = value; + if (unsignedLongValue instanceof Message) { Review Comment: We should make an assertion that the type is UInt64 indeed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org