alexeykudinkin commented on code in PR #6806: URL: https://github.com/apache/hudi/pull/6806#discussion_r981805427
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -314,7 +351,16 @@ private Object convertObject(Schema schema, Object value) { case ENUM: return GenericData.get().createEnum(value.toString(), schema); case FIXED: - return GenericData.get().createFixed(null, ((GenericFixed) value).bytes(), schema); + if (value instanceof byte[]) { + return GenericData.get().createFixed(null, (byte[]) value, schema); + } + Object unsignedLongValue = value; + if (unsignedLongValue instanceof Message) { + // Unwrap UInt64Value + unsignedLongValue = getWrappedValue(unsignedLongValue); + } + // convert the long to its unsigned value + return DECIMAL_CONVERSION.toFixed(new BigDecimal(toUnsignedBigInteger((Long) unsignedLongValue)), schema, schema.getLogicalType()); Review Comment: There's one that takes long ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java: ########## @@ -85,53 +121,58 @@ public static GenericRecord convertToAvro(Schema schema, Message message) { private static class AvroSupport { private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL); + // The max unsigned long value has 20 digits, so a decimal with precision 20 and scale 0 is required to represent all possible values. + // A byte array of length N can store at most floor(log_10(2^(8 × N - 1) - 1)) base 10 digits so we require N = 9. + private static final Schema UNSIGNED_LONG_SCHEMA = LogicalTypes.decimal(20).addToSchema(Schema.createFixed("unsigned_long", null, "org.apache.hudi.protos", 9)); + private static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion(); private static final String OVERFLOW_DESCRIPTOR_FIELD_NAME = "descriptor_full_name"; private static final String OVERFLOW_BYTES_FIELD_NAME = "proto_bytes"; private static final Schema RECURSION_OVERFLOW_SCHEMA = Schema.createRecord("recursion_overflow", null, "org.apache.hudi.proto", false, Arrays.asList(new Schema.Field(OVERFLOW_DESCRIPTOR_FIELD_NAME, STRING_SCHEMA, null, ""), new Schema.Field(OVERFLOW_BYTES_FIELD_NAME, Schema.create(Schema.Type.BYTES), null, "".getBytes()))); - private static final AvroSupport INSTANCE = new AvroSupport(); // A cache of the proto class name paired with whether wrapped primitives should be flattened as the key and the generated avro schema as the value private static final Map<SchemaCacheKey, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>(); // A cache with a key as the pair target avro schema and the proto descriptor for the source and the value as an array of proto field descriptors where the order matches the avro ordering. // When converting from proto to avro, we want to be able to iterate over the fields in the proto in the same order as they appear in the avro schema. private static final Map<Pair<Schema, Descriptors.Descriptor>, Descriptors.FieldDescriptor[]> FIELD_CACHE = new ConcurrentHashMap<>(); - private static final Map<Descriptors.Descriptor, Schema.Type> WRAPPER_DESCRIPTORS_TO_TYPE = getWrapperDescriptorsToType(); - - private static Map<Descriptors.Descriptor, Schema.Type> getWrapperDescriptorsToType() { - Map<Descriptors.Descriptor, Schema.Type> wrapperDescriptorsToType = new HashMap<>(); - wrapperDescriptorsToType.put(StringValue.getDescriptor(), Schema.Type.STRING); - wrapperDescriptorsToType.put(Int32Value.getDescriptor(), Schema.Type.INT); - wrapperDescriptorsToType.put(UInt32Value.getDescriptor(), Schema.Type.INT); - wrapperDescriptorsToType.put(Int64Value.getDescriptor(), Schema.Type.LONG); - wrapperDescriptorsToType.put(UInt64Value.getDescriptor(), Schema.Type.LONG); - wrapperDescriptorsToType.put(BoolValue.getDescriptor(), Schema.Type.BOOLEAN); - wrapperDescriptorsToType.put(BytesValue.getDescriptor(), Schema.Type.BYTES); - wrapperDescriptorsToType.put(DoubleValue.getDescriptor(), Schema.Type.DOUBLE); - wrapperDescriptorsToType.put(FloatValue.getDescriptor(), Schema.Type.FLOAT); - return wrapperDescriptorsToType; - } + private static final Set<Descriptors.Descriptor> WRAPPER_DESCRIPTORS_TO_TYPE = getWrapperDescriptorsToType(); - private AvroSupport() { + private static Set<Descriptors.Descriptor> getWrapperDescriptorsToType() { Review Comment: We can inline this method like following: ``` val descriptors = new HashSet<>(Arrays.asList(...)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org