ccciudatu commented on a change in pull request #13572:
URL: https://github.com/apache/beam/pull/13572#discussion_r546464828
##########
File path:
sdks/java/io/thrift/src/main/java/org/apache/beam/sdk/io/thrift/ThriftSchema.java
##########
@@ -243,37 +253,65 @@ private FieldValueTypeInformation
fieldValueTypeInfo(Class<?> type, String field
@Override
public @NonNull SchemaUserTypeCreator schemaTypeCreator(
@NonNull Class<?> targetClass, @NonNull Schema schema) {
- return params -> restoreThriftObject(targetClass, params);
+ return thriftMapper(targetClass, schema);
+ }
+
+ private <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends TBase<T,
FieldT>>
+ SchemaUserTypeCreator thriftMapper(Class<?> targetClass, Schema schema) {
+ final Map<FieldT, FieldMetaData> fieldDescriptors =
schemaFieldDescriptors(targetClass, schema);
+ return params -> restoreThriftObject(targetClass, fieldDescriptors,
params);
}
+ @SuppressWarnings("nullness")
private <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends TBase<T,
FieldT>>
- T restoreThriftObject(Class<?> targetClass, Object[] params) {
+ Map<FieldT, FieldMetaData> schemaFieldDescriptors(Class<?> targetClass,
Schema schema) {
+ final Map<FieldT, FieldMetaData> fieldDescriptors =
thriftFieldDescriptors(targetClass);
+ final Map<String, FieldT> fields =
+ fieldDescriptors.keySet().stream()
+ .collect(Collectors.toMap(FieldT::getFieldName,
Function.identity()));
+
+ return schema.getFields().stream()
+ .map(Schema.Field::getName)
+ .map(fields::get)
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ fieldDescriptors::get,
+ ThriftSchema::throwingCombiner,
+ LinkedHashMap::new));
+ }
+
+ private <FieldT extends Enum<FieldT> & TFieldIdEnum, T extends TBase<T,
FieldT>>
+ T restoreThriftObject(
+ Class<?> targetClass, Map<FieldT, FieldMetaData> fields, Object[]
params) {
+ if (params.length != fields.size()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The parameter list: %s does not match the expected fields: %s",
+ Arrays.toString(params), fields.keySet()));
+ }
try {
@SuppressWarnings("unchecked")
final T thrift = (T) targetClass.getDeclaredConstructor().newInstance();
- final Map<FieldT, FieldMetaData> fieldMap =
thriftFieldDescriptors(targetClass);
- // the underlying Map is an EnumMap, so it's safe to rely on the order
of its keys
- fieldMap.forEach(
- (field, descriptor) ->
- setThriftField(thrift, field, descriptor,
params[field.ordinal()]));
+ final Iterator<Entry<FieldT, FieldMetaData>> iter =
fields.entrySet().iterator();
+ Stream.of(params).forEach(param -> setThriftField(thrift, iter.next(),
param));
return thrift;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
private <FieldT extends TFieldIdEnum, T extends TBase<T, FieldT>> void
setThriftField(
- T thrift, FieldT field, FieldMetaData descriptor, Object value) {
+ T thrift, Entry<FieldT, FieldMetaData> fieldDescriptor, Object value) {
+ final FieldT field = fieldDescriptor.getKey();
+ final FieldMetaData descriptor = fieldDescriptor.getValue();
if (value != null) {
final Object actualValue;
switch (descriptor.valueMetaData.type) {
case TType.SET:
- final Set<Object> set = new HashSet<>();
- final Iterable<@NonNull ?> iterable = (Iterable<@NonNull ?>) value;
- for (@NonNull Object elem : iterable) {
- set.add(elem);
- }
- actualValue = set;
+ actualValue =
+ StreamSupport.stream(((Iterable<?>) value).spliterator(), false)
+ .collect(Collectors.toSet());
Review comment:
This wasn't possible before because of the "nullness" checks, but now it
seems to work so I cleaned it up.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]