ccciudatu commented on a change in pull request #13572:
URL: https://github.com/apache/beam/pull/13572#discussion_r546464828



##########
File path: 
sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java
##########
@@ -243,37 +253,65 @@ private FieldValueTypeInformation 
fieldValueTypeInfo(Class<?> type, String field
   @Override
   public @NonNull SchemaUserTypeCreator schemaTypeCreator(
       @NonNull Class<?> targetClass, @NonNull Schema schema) {
-    return params -> restoreThriftObject(targetClass, params);
+    return thriftMapper(targetClass, schema);
+  }
+
+  private <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends TBase<T, 
FieldT>>
+      SchemaUserTypeCreator thriftMapper(Class<?> targetClass, Schema schema) {
+    final Map<FieldT, FieldMetaData> fieldDescriptors = 
schemaFieldDescriptors(targetClass, schema);
+    return params -> restoreThriftObject(targetClass, fieldDescriptors, 
params);
   }
 
+  @SuppressWarnings("nullness")
   private <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends TBase<T, 
FieldT>>
-      T restoreThriftObject(Class<?> targetClass, Object[] params) {
+      Map<FieldT, FieldMetaData> schemaFieldDescriptors(Class<?> targetClass, 
Schema schema) {
+    final Map<FieldT, FieldMetaData> fieldDescriptors = 
thriftFieldDescriptors(targetClass);
+    final Map<String, FieldT> fields =
+        fieldDescriptors.keySet().stream()
+            .collect(Collectors.toMap(FieldT::getFieldName, 
Function.identity()));
+
+    return schema.getFields().stream()
+        .map(Schema.Field::getName)
+        .map(fields::get)
+        .collect(
+            Collectors.toMap(
+                Function.identity(),
+                fieldDescriptors::get,
+                ThriftSchema::throwingCombiner,
+                LinkedHashMap::new));
+  }
+
+  private <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends TBase<T, 
FieldT>>
+      T restoreThriftObject(
+          Class<?> targetClass, Map<FieldT, FieldMetaData> fields, Object[] 
params) {
+    if (params.length != fields.size()) {
+      throw new IllegalArgumentException(
+          String.format(
+              "The parameter list: %s does not match the expected fields: %s",
+              Arrays.toString(params), fields.keySet()));
+    }
     try {
       @SuppressWarnings("unchecked")
       final T thrift = (T) targetClass.getDeclaredConstructor().newInstance();
-      final Map<FieldT, FieldMetaData> fieldMap = 
thriftFieldDescriptors(targetClass);
-      // the underlying Map is an EnumMap, so it's safe to rely on the order 
of its keys
-      fieldMap.forEach(
-          (field, descriptor) ->
-              setThriftField(thrift, field, descriptor, 
params[field.ordinal()]));
+      final Iterator<Entry<FieldT, FieldMetaData>> iter = 
fields.entrySet().iterator();
+      Stream.of(params).forEach(param -> setThriftField(thrift, iter.next(), 
param));
       return thrift;
     } catch (Exception e) {
       throw new IllegalStateException(e);
     }
   }
 
   private <FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>> void 
setThriftField(
-      T thrift, FieldT field, FieldMetaData descriptor, Object value) {
+      T thrift, Entry<FieldT, FieldMetaData> fieldDescriptor, Object value) {
+    final FieldT field = fieldDescriptor.getKey();
+    final FieldMetaData descriptor = fieldDescriptor.getValue();
     if (value != null) {
       final Object actualValue;
       switch (descriptor.valueMetaData.type) {
         case TType.SET:
-          final Set<Object> set = new HashSet<>();
-          final Iterable<@NonNull ?> iterable = (Iterable<@NonNull ?>) value;
-          for (@NonNull Object elem : iterable) {
-            set.add(elem);
-          }
-          actualValue = set;
+          actualValue =
+              StreamSupport.stream(((Iterable<?>) value).spliterator(), false)
+                  .collect(Collectors.toSet());

Review comment:
       This wasn't possible before because of the "nullness" checks, but now it 
seems to work so I cleaned it up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to