alexeykudinkin commented on code in PR #6806:
URL: https://github.com/apache/hudi/pull/6806#discussion_r981654227


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -45,10 +45,15 @@ public static class Config {
         .sinceVersion("0.13.0")
         .withDocumentation("The Protobuf Message class used as the source for 
the schema.");
 
-    public static final ConfigProperty<Boolean> 
PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = 
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
+    public static final ConfigProperty<Boolean> 
PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = 
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
         .defaultValue(false)
         .sinceVersion("0.13.0")
-        .withDocumentation("When set to false wrapped primitives like 
Int64Value are translated to a record with a single 'value' field instead of 
simply a nullable value");
+        .withDocumentation("When set to true wrapped primitives like 
Int64Value are translated to a record with a single 'value' field instead of 
simply a nullable value");

Review Comment:
   Let's call out what default behavior is (otherwise reader needs to do double 
negation to realize)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -45,10 +45,15 @@ public static class Config {
         .sinceVersion("0.13.0")
         .withDocumentation("The Protobuf Message class used as the source for 
the schema.");
 
-    public static final ConfigProperty<Boolean> 
PROTO_SCHEMA_FLATTEN_WRAPPED_PRIMITIVES = 
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
+    public static final ConfigProperty<Boolean> 
PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = 
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
         .defaultValue(false)
         .sinceVersion("0.13.0")
-        .withDocumentation("When set to false wrapped primitives like 
Int64Value are translated to a record with a single 'value' field instead of 
simply a nullable value");
+        .withDocumentation("When set to true wrapped primitives like 
Int64Value are translated to a record with a single 'value' field instead of 
simply a nullable value");
+
+    public static final ConfigProperty<Boolean> 
PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS = 
ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".timestamps.as.records")
+        .defaultValue(false)
+        .sinceVersion("0.13.0")
+        .withDocumentation("When set to true Timestamp fields are translated 
to a record with a seconds and nanos field, instead of a long with the 
timestamp-micros logical type");

Review Comment:
   Same here



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -58,12 +65,13 @@ public class ProtoConversionUtil {
   /**
    * Creates an Avro {@link Schema} for the provided class. Assumes that the 
class is a protobuf {@link Message}.
    * @param clazz The protobuf class
-   * @param flattenWrappedPrimitives set to true to treat wrapped primitives 
like nullable fields instead of nested messages.
+   * @param wrappedPrimitivesAsRecords set to true to treat wrapped primitives 
like record with a single "value" field instead of simply a nullable field
    * @param maxRecursionDepth the number of times to unravel a recursive proto 
schema before spilling the rest to bytes
+   * @param timestampsAsRecords if true convert {@link Timestamp} to a Record 
with a seconds and nanos field, otherwise convert protobuf {@link Timestamp} to 
a long with the time-mircos logical type.
    * @return An Avro schema
    */
-  public static Schema getAvroSchemaForMessageClass(Class clazz, boolean 
flattenWrappedPrimitives, int maxRecursionDepth) {
-    return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives, 
maxRecursionDepth);
+  public static Schema getAvroSchemaForMessageClass(Class clazz, boolean 
wrappedPrimitivesAsRecords, int maxRecursionDepth, boolean timestampsAsRecords) 
{
+    return AvroSupport.get().getSchema(clazz, wrappedPrimitivesAsRecords, 
maxRecursionDepth, timestampsAsRecords);

Review Comment:
   Similar comment: instead of passing these as params:
   
    - Make `AvroSupport` non-singleton
    - Pass config into `AvroSupport` and init these as fields in there



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -204,34 +226,48 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor 
f, CopyOnWriteMap<Desc
           return schemaFinalizer.apply(Schema.create(Schema.Type.INT));
         case UINT32:
         case INT64:
-        case UINT64:
         case SINT64:
         case FIXED64:
         case SFIXED64:
           return schemaFinalizer.apply(Schema.create(Schema.Type.LONG));
+        case UINT64:
+          return schemaFinalizer.apply(UNSIGNED_LONG_SCHEMA);
         case MESSAGE:
-          String updatedPath = appendFieldNameToPath(path, f.getName());
-          if (flattenWrappedPrimitives && 
WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) {
+          String updatedPath = appendFieldNameToPath(path, 
fieldDescriptor.getName());
+          if (!wrappedPrimitivesAsRecords && 
WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) {
             // all wrapper types have a single field, so we can get the first 
field in the message's schema
-            return 
schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL_SCHEMA, 
getFieldSchema(f.getMessageType().getFields().get(0), recursionDepths, 
flattenWrappedPrimitives, updatedPath,
-                maxRecursionDepth))));
+            return 
schemaFinalizer.apply(makeSchemaNullable(getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0),
 recursionDepths, wrappedPrimitivesAsRecords, updatedPath,
+                maxRecursionDepth, timestampsAsRecords)));
+          }
+          if (!timestampsAsRecords && 
Timestamp.getDescriptor().equals(fieldDescriptor.getMessageType())) {
+            // Handle timestamps as long with logical type
+            return 
schemaFinalizer.apply(makeSchemaNullable(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))));

Review Comment:
   Same comment as above



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) {
         case ENUM:
           return GenericData.get().createEnum(value.toString(), schema);
         case FIXED:
-          return GenericData.get().createFixed(null, ((GenericFixed) 
value).bytes(), schema);
+          if (value instanceof byte[]) {
+            return GenericData.get().createFixed(null, (byte[]) value, schema);
+          }
+          Object unsignedLongValue = value;
+          if (unsignedLongValue instanceof Message) {
+            // Unwrap UInt64Value
+            unsignedLongValue = getWrappedValue(unsignedLongValue);
+          }
+          // convert the long to its unsigned value
+          return DECIMAL_CONVERSION.toFixed(new 
BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema, 
schema.getLogicalType());

Review Comment:
   There's `BigDecimal.valueOf` we can use



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestProtoConversionUtil.java:
##########
@@ -130,10 +141,43 @@ public void recursiveSchema_withOverflow() throws 
Exception {
     
Assertions.assertEquals(input.getChildren(1).getRecurseField().getRecurseField(),
 parsedChildren2Overflow);
   }
 
+  @Test
+  public void oneOfSchema() throws IOException {
+    Schema.Parser parser = new Schema.Parser();
+    Schema convertedSchema = 
parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/oneof_schema.avsc"));
+    WithOneOf input = WithOneOf.newBuilder().setLong(32L).build();
+    GenericRecord actual = 
serializeAndDeserializeAvro(ProtoConversionUtil.convertToAvro(convertedSchema, 
input), convertedSchema);
+
+    GenericData.Record expectedRecord = new 
GenericData.Record(convertedSchema);
+    expectedRecord.put("int", null);
+    expectedRecord.put("long", 32L);
+    expectedRecord.put("message", null);
+    Assertions.assertEquals(expectedRecord, actual);
+  }
+
+  private void assertUnsignedLongCorrectness(Schema convertedSchema, Sample 
input, GenericRecord actual, boolean wellKnownTypesAsRecords) {

Review Comment:
   Good indicator of whether method is coherent is how easy it to judge what it 
does by simply looking at its signature. For this one it's not that easy.
   
   I'd suggest to split it actually in 2 and instead make them like following:
   
   ```
   assert(Schema.Field fieldSchema, Long expectedValue, ... actual, ...)
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -85,6 +93,8 @@ public static GenericRecord convertToAvro(Schema schema, 
Message message) {
   private static class AvroSupport {
     private static final Schema STRING_SCHEMA = 
Schema.create(Schema.Type.STRING);
     private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);
+    private static final Schema UNSIGNED_LONG_SCHEMA = 
LogicalTypes.decimal(21).addToSchema(Schema.createFixed("unsigned_long", null, 
"org.apache.hudi.protos", 9));

Review Comment:
   Please add a comment explaining where 21 is coming from



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor 
descriptor, CopyOnWriteMa
       List<Schema.Field> fields = new 
ArrayList<>(descriptor.getFields().size());
       for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
         // each branch of the schema traversal requires its own recursion 
depth tracking so copy the recursionDepths map
-        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, 
maxRecursionDepth), null, getDefault(f)));
+        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path, 
maxRecursionDepth, timestampsAsRecords),
+            null, getDefault(f)));
       }
       result.setFields(fields);
       return result;
     }
 
-    private Schema getFieldSchema(Descriptors.FieldDescriptor f, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
flattenWrappedPrimitives, String path,
-                                  int maxRecursionDepth) {
-      Function<Schema, Schema> schemaFinalizer =  f.isRepeated() ? 
Schema::createArray : Function.identity();
-      switch (f.getType()) {
+    private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
wrappedPrimitivesAsRecords, String path,
+                                  int maxRecursionDepth, boolean 
timestampsAsRecords) {
+      Function<Schema, Schema> schemaFinalizer = schema -> {
+        Schema updatedSchema = schema;
+        // all fields in the oneof will be treated as nullable
+        if (fieldDescriptor.getContainingOneof() != null && !(schema.getType() 
== Schema.Type.UNION && schema.getTypes().get(0).getType() == 
Schema.Type.NULL)) {

Review Comment:
   Should we do an assertion instead making sure that passed in schema is 
nullable?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -58,12 +65,13 @@ public class ProtoConversionUtil {
   /**
    * Creates an Avro {@link Schema} for the provided class. Assumes that the 
class is a protobuf {@link Message}.
    * @param clazz The protobuf class
-   * @param flattenWrappedPrimitives set to true to treat wrapped primitives 
like nullable fields instead of nested messages.
+   * @param wrappedPrimitivesAsRecords set to true to treat wrapped primitives 
like record with a single "value" field instead of simply a nullable field
    * @param maxRecursionDepth the number of times to unravel a recursive proto 
schema before spilling the rest to bytes
+   * @param timestampsAsRecords if true convert {@link Timestamp} to a Record 
with a seconds and nanos field, otherwise convert protobuf {@link Timestamp} to 
a long with the time-mircos logical type.
    * @return An Avro schema
    */
-  public static Schema getAvroSchemaForMessageClass(Class clazz, boolean 
flattenWrappedPrimitives, int maxRecursionDepth) {
-    return AvroSupport.get().getSchema(clazz, flattenWrappedPrimitives, 
maxRecursionDepth);
+  public static Schema getAvroSchemaForMessageClass(Class clazz, boolean 
wrappedPrimitivesAsRecords, int maxRecursionDepth, boolean timestampsAsRecords) 
{

Review Comment:
   Let's pass in config in here instead of individual values (it's not gonna 
scale well)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -204,34 +226,48 @@ private Schema getFieldSchema(Descriptors.FieldDescriptor 
f, CopyOnWriteMap<Desc
           return schemaFinalizer.apply(Schema.create(Schema.Type.INT));
         case UINT32:
         case INT64:
-        case UINT64:
         case SINT64:
         case FIXED64:
         case SFIXED64:
           return schemaFinalizer.apply(Schema.create(Schema.Type.LONG));
+        case UINT64:
+          return schemaFinalizer.apply(UNSIGNED_LONG_SCHEMA);
         case MESSAGE:
-          String updatedPath = appendFieldNameToPath(path, f.getName());
-          if (flattenWrappedPrimitives && 
WRAPPER_DESCRIPTORS_TO_TYPE.containsKey(f.getMessageType())) {
+          String updatedPath = appendFieldNameToPath(path, 
fieldDescriptor.getName());
+          if (!wrappedPrimitivesAsRecords && 
WRAPPER_DESCRIPTORS_TO_TYPE.contains(fieldDescriptor.getMessageType())) {
             // all wrapper types have a single field, so we can get the first 
field in the message's schema
-            return 
schemaFinalizer.apply(Schema.createUnion(Arrays.asList(NULL_SCHEMA, 
getFieldSchema(f.getMessageType().getFields().get(0), recursionDepths, 
flattenWrappedPrimitives, updatedPath,
-                maxRecursionDepth))));
+            return 
schemaFinalizer.apply(makeSchemaNullable(getFieldSchema(fieldDescriptor.getMessageType().getFields().get(0),
 recursionDepths, wrappedPrimitivesAsRecords, updatedPath,

Review Comment:
   Let's break this expression up (for readability, extracting val for schema)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor 
descriptor, CopyOnWriteMa
       List<Schema.Field> fields = new 
ArrayList<>(descriptor.getFields().size());
       for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
         // each branch of the schema traversal requires its own recursion 
depth tracking so copy the recursionDepths map
-        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, 
maxRecursionDepth), null, getDefault(f)));
+        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path, 
maxRecursionDepth, timestampsAsRecords),
+            null, getDefault(f)));
       }
       result.setFields(fields);
       return result;
     }
 
-    private Schema getFieldSchema(Descriptors.FieldDescriptor f, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
flattenWrappedPrimitives, String path,
-                                  int maxRecursionDepth) {
-      Function<Schema, Schema> schemaFinalizer =  f.isRepeated() ? 
Schema::createArray : Function.identity();
-      switch (f.getType()) {
+    private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
wrappedPrimitivesAsRecords, String path,
+                                  int maxRecursionDepth, boolean 
timestampsAsRecords) {
+      Function<Schema, Schema> schemaFinalizer = schema -> {
+        Schema updatedSchema = schema;
+        // all fields in the oneof will be treated as nullable
+        if (fieldDescriptor.getContainingOneof() != null && !(schema.getType() 
== Schema.Type.UNION && schema.getTypes().get(0).getType() == 
Schema.Type.NULL)) {

Review Comment:
   Let's extract this finalizer as a static method to avoid gotchas w/ closures



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -173,24 +184,35 @@ private Schema getMessageSchema(Descriptors.Descriptor 
descriptor, CopyOnWriteMa
       List<Schema.Field> fields = new 
ArrayList<>(descriptor.getFields().size());
       for (Descriptors.FieldDescriptor f : descriptor.getFields()) {
         // each branch of the schema traversal requires its own recursion 
depth tracking so copy the recursionDepths map
-        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), flattenWrappedPrimitives, path, 
maxRecursionDepth), null, getDefault(f)));
+        fields.add(new Schema.Field(f.getName(), getFieldSchema(f, new 
CopyOnWriteMap<>(recursionDepths), wrappedPrimitivesAsRecords, path, 
maxRecursionDepth, timestampsAsRecords),
+            null, getDefault(f)));
       }
       result.setFields(fields);
       return result;
     }
 
-    private Schema getFieldSchema(Descriptors.FieldDescriptor f, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
flattenWrappedPrimitives, String path,
-                                  int maxRecursionDepth) {
-      Function<Schema, Schema> schemaFinalizer =  f.isRepeated() ? 
Schema::createArray : Function.identity();
-      switch (f.getType()) {
+    private Schema getFieldSchema(Descriptors.FieldDescriptor fieldDescriptor, 
CopyOnWriteMap<Descriptors.Descriptor, Integer> recursionDepths, boolean 
wrappedPrimitivesAsRecords, String path,
+                                  int maxRecursionDepth, boolean 
timestampsAsRecords) {
+      Function<Schema, Schema> schemaFinalizer = schema -> {
+        Schema updatedSchema = schema;
+        // all fields in the oneof will be treated as nullable
+        if (fieldDescriptor.getContainingOneof() != null && !(schema.getType() 
== Schema.Type.UNION && schema.getTypes().get(0).getType() == 
Schema.Type.NULL)) {
+          updatedSchema = makeSchemaNullable(schema);
+        }
+        if (fieldDescriptor.isRepeated()) {

Review Comment:
   Order of these conditionals should be reversed (repeated field w/in oneof). 
Let's add a test for that



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java:
##########
@@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) {
         case ENUM:
           return GenericData.get().createEnum(value.toString(), schema);
         case FIXED:
-          return GenericData.get().createFixed(null, ((GenericFixed) 
value).bytes(), schema);
+          if (value instanceof byte[]) {
+            return GenericData.get().createFixed(null, (byte[]) value, schema);
+          }
+          Object unsignedLongValue = value;
+          if (unsignedLongValue instanceof Message) {

Review Comment:
   We should make an assertion that the type is UInt64 indeed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to