This is an automated email from the ASF dual-hosted git repository.
stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 66e543997f1 support generics in from row and to row conversions
(#37347)
66e543997f1 is described below
commit 66e543997f111d77810bcc6a0417ab612ac43277
Author: Maciej Szwaja <[email protected]>
AuthorDate: Mon Jun 8 12:20:20 2026 +0200
support generics in from row and to row conversions (#37347)
---
.../sdk/schemas/FieldValueTypeInformation.java | 13 +-
.../beam/sdk/schemas/FromRowUsingCreator.java | 6 +-
.../sdk/schemas/GetterBasedSchemaProvider.java | 98 +++--
.../beam/sdk/schemas/utils/AutoValueUtils.java | 9 +-
.../main/java/org/apache/beam/sdk/values/Row.java | 6 +-
.../org/apache/beam/sdk/values/RowWithGetters.java | 13 +-
.../beam/sdk/schemas/AutoValueSchemaTest.java | 426 ++++++++++++++++++++-
.../beam/sdk/schemas/JavaBeanSchemaTest.java | 167 +++++++-
.../beam/sdk/schemas/JavaFieldSchemaTest.java | 196 +++++++++-
.../beam/sdk/schemas/utils/TestJavaBeans.java | 35 ++
.../apache/beam/sdk/schemas/utils/TestPOJOs.java | 25 ++
.../beam/sdk/extensions/arrow/ArrowConversion.java | 4 +-
.../sdk/io/aws2/schemas/AwsSchemaProvider.java | 7 +-
.../apache/beam/sdk/io/aws2/schemas/AwsTypes.java | 16 +
14 files changed, 952 insertions(+), 69 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index 43aac6a5e20..95030eda098 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -105,7 +105,11 @@ public abstract class FieldValueTypeInformation implements
Serializable {
public abstract Builder setDescription(@Nullable String fieldDescription);
- abstract FieldValueTypeInformation build();
+ public abstract FieldValueTypeInformation build();
+ }
+
+ public static Builder builder() {
+ return new AutoValue_FieldValueTypeInformation.Builder();
}
public static FieldValueTypeInformation forOneOf(
@@ -311,7 +315,8 @@ public abstract class FieldValueTypeInformation implements
Serializable {
return toBuilder().setName(name).build();
}
- static @Nullable FieldValueTypeInformation
getIterableComponentType(TypeDescriptor<?> valueType) {
+ public static @Nullable FieldValueTypeInformation getIterableComponentType(
+ TypeDescriptor<?> valueType) {
// TODO: Figure out nullable elements.
TypeDescriptor<?> componentType =
ReflectUtils.getIterableComponentType(valueType);
if (componentType == null) {
@@ -331,13 +336,13 @@ public abstract class FieldValueTypeInformation
implements Serializable {
}
// If the type is a map type, returns the key type, otherwise returns a null
reference.
- private static @Nullable FieldValueTypeInformation getMapKeyType(
+ public static @Nullable FieldValueTypeInformation getMapKeyType(
TypeDescriptor<?> typeDescriptor) {
return getMapType(typeDescriptor, 0);
}
// If the type is a map type, returns the value type, otherwise returns a
null reference.
- private static @Nullable FieldValueTypeInformation getMapValueType(
+ public static @Nullable FieldValueTypeInformation getMapValueType(
TypeDescriptor<?> typeDescriptor) {
return getMapType(typeDescriptor, 1);
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 464dc00cec7..0f5c47f7259 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -82,10 +82,10 @@ class FromRowUsingCreator<T> implements
SerializableFunction<Row, T>, Function<R
return null;
}
if (row instanceof RowWithGetters) {
- Object target = ((RowWithGetters) row).getGetterTarget();
- if (target.getClass().equals(typeDescriptor.getRawType())) {
+ RowWithGetters rowWithGetters = (RowWithGetters) row;
+ if (rowWithGetters.getGetterTargetType().equals(typeDescriptor)) {
// Efficient path: simply extract the underlying object instead of
creating a new one.
- return (T) target;
+ return (T) rowWithGetters.getGetterTarget();
}
}
if (fieldConverters == null) {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index e08f193d407..7fb9e5a5dee 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -20,16 +20,20 @@ package org.apache.beam.sdk.schemas;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.LogicalType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -117,9 +121,11 @@ public abstract class GetterBasedSchemaProvider implements
SchemaProvider {
implements SerializableFunction<T, Row> {
private final Schema schema;
private final Factory<List<FieldValueGetter<T, Object>>> getterFactory;
+ private final TypeDescriptor getterTargetType;
- public ToRowWithValueGetters(Schema schema) {
+ public ToRowWithValueGetters(Schema schema, TypeDescriptor
getterTargetType) {
this.schema = schema;
+ this.getterTargetType = getterTargetType;
// Since we know that this factory is always called from inside the
lambda with the same
// schema, return a caching factory that caches the first value seen for
each class. This
// prevents having to lookup the getter list each time createGetters is
called.
@@ -128,13 +134,13 @@ public abstract class GetterBasedSchemaProvider
implements SchemaProvider {
(Factory<List<FieldValueGetter<T, Object>>>)
(typeDescriptor, schema1) ->
(List)
- GetterBasedSchemaProvider.this.fieldValueGetters(
- typeDescriptor, schema1));
+
GetterBasedSchemaProvider.this.fieldValueGetters(typeDescriptor, schema1),
+ GetterBasedSchemaProvider.this::fieldValueTypeInformations);
}
@Override
public Row apply(T input) {
- return Row.withSchema(schema).withFieldValueGetters(getterFactory,
input);
+ return Row.withSchema(schema).withFieldValueGetters(getterFactory,
input, getterTargetType);
}
private GetterBasedSchemaProvider getOuter() {
@@ -172,7 +178,7 @@ public abstract class GetterBasedSchemaProvider implements
SchemaProvider {
Verify.verifyNotNull(
schemaFor(typeDescriptor), "can't create a ToRowFunction with null
schema");
- return new ToRowWithValueGetters<>(schema);
+ return new ToRowWithValueGetters<>(schema, typeDescriptor);
}
@Override
@@ -194,16 +200,21 @@ public abstract class GetterBasedSchemaProvider
implements SchemaProvider {
private static class RowValueGettersFactory<T extends @NonNull Object>
implements Factory<List<FieldValueGetter<T, Object>>> {
private final Factory<List<FieldValueGetter<T, Object>>> gettersFactory;
+ private final Factory<List<FieldValueTypeInformation>> typeInfoFactory;
private final @NotOnlyInitialized Factory<List<FieldValueGetter<T,
Object>>>
cachingGettersFactory;
static <T extends @NonNull Object> Factory<List<FieldValueGetter<T,
Object>>> of(
- Factory<List<FieldValueGetter<T, Object>>> gettersFactory) {
- return new
RowValueGettersFactory<>(gettersFactory).cachingGettersFactory;
+ Factory<List<FieldValueGetter<T, Object>>> gettersFactory,
+ Factory<List<FieldValueTypeInformation>> typeInfoFactory) {
+ return new RowValueGettersFactory(gettersFactory,
typeInfoFactory).cachingGettersFactory;
}
- RowValueGettersFactory(Factory<List<FieldValueGetter<T, Object>>>
gettersFactory) {
+ RowValueGettersFactory(
+ Factory<List<FieldValueGetter<T, Object>>> gettersFactory,
+ Factory<List<FieldValueTypeInformation>> typeInfoFactory) {
this.gettersFactory = gettersFactory;
+ this.typeInfoFactory = typeInfoFactory;
this.cachingGettersFactory = new CachingFactory<>(this);
}
@@ -211,9 +222,17 @@ public abstract class GetterBasedSchemaProvider implements
SchemaProvider {
public List<FieldValueGetter<T, Object>> create(
TypeDescriptor<?> typeDescriptor, Schema schema) {
List<FieldValueGetter<T, Object>> getters =
gettersFactory.create(typeDescriptor, schema);
+ Map<String, FieldValueTypeInformation> typeInfoByName =
+ typeInfoFactory.create(typeDescriptor, schema).stream()
+ .collect(Collectors.toMap(FieldValueTypeInformation::getName,
Function.identity()));
List<FieldValueGetter<T, Object>> rowGetters = new
ArrayList<>(getters.size());
for (int i = 0; i < getters.size(); i++) {
- rowGetters.add(rowValueGetter(getters.get(i),
schema.getField(i).getType()));
+ FieldValueGetter getter = Verify.verifyNotNull(getters.get(i));
+ rowGetters.add(
+ rowValueGetter(
+ getter,
+ schema.getField(i).getType(),
+
Verify.verifyNotNull(typeInfoByName.get(getter.name())).getType()));
}
return rowGetters;
}
@@ -229,26 +248,49 @@ public abstract class GetterBasedSchemaProvider
implements SchemaProvider {
||
needsConversion(Verify.verifyNotNull(type.getMapValueType()))));
}
- FieldValueGetter<T, Object> rowValueGetter(FieldValueGetter base,
FieldType type) {
+ FieldValueGetter<T, Object> rowValueGetter(
+ FieldValueGetter base, FieldType type, @Nullable TypeDescriptor<?>
getterReturnType) {
TypeName typeName = type.getTypeName();
if (!needsConversion(type)) {
return base;
}
if (typeName.equals(TypeName.ROW)) {
- return new GetRow(base, Verify.verifyNotNull(type.getRowSchema()),
cachingGettersFactory);
- } else if (typeName.equals(TypeName.ARRAY)) {
+ return new GetRow(
+ base,
+ getterReturnType,
+ Verify.verifyNotNull(type.getRowSchema()),
+ cachingGettersFactory);
+ } else if (typeName.equals(TypeName.ARRAY) ||
typeName.equals(TypeName.ITERABLE)) {
FieldType elementType =
Verify.verifyNotNull(type.getCollectionElementType());
- return elementType.getTypeName().equals(TypeName.ROW)
- ? new GetEagerCollection(base, converter(elementType))
- : new GetCollection(base, converter(elementType));
- } else if (typeName.equals(TypeName.ITERABLE)) {
- return new GetIterable(
- base,
converter(Verify.verifyNotNull(type.getCollectionElementType())));
+ TypeDescriptor<?> elementTypeDescriptor =
+ Optional.ofNullable(getterReturnType)
+ .map(ReflectUtils::getIterableComponentType)
+ .orElse(null);
+ if (TypeName.ARRAY == typeName) {
+ return TypeName.ROW == elementType.getTypeName()
+ ? new GetEagerCollection(base, converter(elementType,
elementTypeDescriptor))
+ : new GetCollection(base, converter(elementType,
elementTypeDescriptor));
+ } else { // TypeName.ITERABLE
+ return new GetIterable(base, converter(elementType,
elementTypeDescriptor));
+ }
} else if (typeName.equals(TypeName.MAP)) {
+ @Nullable
+ TypeDescriptor[] resolvedKeyValueTypes =
+ Optional.ofNullable(getterReturnType)
+ .<@Nullable TypeDescriptor[]>map(
+ getterType ->
+ Arrays.stream(Map.class.getTypeParameters())
+ .<@Nullable TypeDescriptor>map(
+ typeVar -> {
+ TypeDescriptor resolved =
getterType.resolveType(typeVar);
+ return resolved.hasUnresolvedParameters() ?
null : resolved;
+ })
+ .<@Nullable
TypeDescriptor>toArray(TypeDescriptor[]::new))
+ .orElse(new TypeDescriptor[] {null, null});
return new GetMap(
base,
- converter(Verify.verifyNotNull(type.getMapKeyType())),
- converter(Verify.verifyNotNull(type.getMapValueType())));
+ converter(Verify.verifyNotNull(type.getMapKeyType()),
resolvedKeyValueTypes[0]),
+ converter(Verify.verifyNotNull(type.getMapValueType()),
resolvedKeyValueTypes[1]));
} else if (type.isLogicalType(OneOfType.IDENTIFIER)) {
OneOfType oneOfType = type.getLogicalType(OneOfType.class);
Schema oneOfSchema = oneOfType.getOneOfSchema();
@@ -258,7 +300,7 @@ public abstract class GetterBasedSchemaProvider implements
SchemaProvider {
Maps.newHashMapWithExpectedSize(values.size());
for (Map.Entry<String, Integer> kv : values.entrySet()) {
FieldType fieldType = oneOfSchema.getField(kv.getKey()).getType();
- FieldValueGetter<?, ?> converter = converter(fieldType);
+ FieldValueGetter<?, ?> converter = converter(fieldType, null);
converters.put(kv.getValue(), converter);
}
@@ -269,27 +311,35 @@ public abstract class GetterBasedSchemaProvider
implements SchemaProvider {
return base;
}
- FieldValueGetter<?, ?> converter(FieldType type) {
- return rowValueGetter(IDENTITY, type);
+ FieldValueGetter<?, ?> converter(FieldType type, @Nullable
TypeDescriptor<?> getterReturnType) {
+ return rowValueGetter(IDENTITY, type, getterReturnType);
}
static class GetRow<T extends @NonNull Object, V extends @NonNull Object>
extends Converter<T, V> {
final Schema schema;
final Factory<List<FieldValueGetter<V, Object>>> factory;
+ final @Nullable TypeDescriptor<?> valueType;
GetRow(
FieldValueGetter<T, V> getter,
+ @Nullable TypeDescriptor<?> getterReturnType,
Schema schema,
Factory<List<FieldValueGetter<V, Object>>> factory) {
super(getter);
this.schema = schema;
this.factory = factory;
+ this.valueType = getterReturnType;
}
@Override
Object convert(V value) {
- return Row.withSchema(schema).withFieldValueGetters(factory, value);
+ return Row.withSchema(schema)
+ .withFieldValueGetters(
+ factory,
+ value,
+ Optional.ofNullable(valueType)
+ .orElse((TypeDescriptor)
TypeDescriptor.of(value.getClass())));
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
index 78808fdc10c..a9353bcaaac 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AutoValueUtils.java
@@ -162,7 +162,7 @@ public class AutoValueUtils {
Optional<Constructor<?>> constructor =
Arrays.stream(generatedTypeDescriptor.getRawType().getDeclaredConstructors())
.filter(c -> !Modifier.isPrivate(c.getModifiers()))
- .filter(c -> matchConstructor(c, schemaTypes))
+ .filter(c -> matchConstructor(generatedTypeDescriptor, c,
schemaTypes))
.findAny();
return constructor
.map(
@@ -177,7 +177,9 @@ public class AutoValueUtils {
}
private static boolean matchConstructor(
- Constructor<?> constructor, List<FieldValueTypeInformation> getterTypes)
{
+ TypeDescriptor typeDescriptor,
+ Constructor<?> constructor,
+ List<FieldValueTypeInformation> getterTypes) {
if (constructor.getParameters().length != getterTypes.size()) {
return false;
}
@@ -197,7 +199,8 @@ public class AutoValueUtils {
// Verify that constructor parameters match (name and type) the inferred
schema.
for (Parameter parameter : constructor.getParameters()) {
FieldValueTypeInformation type = typeMap.get(parameter.getName());
- if (type == null || type.getRawType() != parameter.getType()) {
+ if (type == null
+ ||
!type.getType().equals(typeDescriptor.resolveType(parameter.getParameterizedType())))
{
valid = false;
break;
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 11d02be46d2..6eb063f84b6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -838,9 +838,11 @@ public abstract class Row implements Serializable {
@Internal
public <T> Row withFieldValueGetters(
- Factory<List<FieldValueGetter<T, Object>>> fieldValueGetterFactory, T
getterTarget) {
+ Factory<List<FieldValueGetter<T, Object>>> fieldValueGetterFactory,
+ T getterTarget,
+ TypeDescriptor<?> getterTargetType) {
checkState(getterTarget != null, "getters require withGetterTarget.");
- return new RowWithGetters<>(schema, fieldValueGetterFactory,
getterTarget);
+ return new RowWithGetters<>(schema, fieldValueGetterFactory,
getterTarget, getterTargetType);
}
public Row build() {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
index 35e0ac20d3f..0cbc7bfb992 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowWithGetters.java
@@ -44,14 +44,19 @@ import org.checkerframework.checker.nullness.qual.Nullable;
@SuppressWarnings("rawtypes")
public class RowWithGetters<T extends @NonNull Object> extends Row {
private final T getterTarget;
+ private final TypeDescriptor<?> getterTargetType;
private final List<FieldValueGetter<T, Object>> getters;
private @Nullable Map<Integer, @Nullable Object> cache = null;
RowWithGetters(
- Schema schema, Factory<List<FieldValueGetter<T, Object>>> getterFactory,
T getterTarget) {
+ Schema schema,
+ Factory<List<FieldValueGetter<T, Object>>> getterFactory,
+ T getterTarget,
+ TypeDescriptor<?> getterTargetType) {
super(schema);
this.getterTarget = getterTarget;
- this.getters =
getterFactory.create(TypeDescriptor.of(getterTarget.getClass()), schema);
+ this.getterTargetType = getterTargetType;
+ this.getters = getterFactory.create(getterTargetType, schema);
}
@Override
@@ -90,6 +95,10 @@ public class RowWithGetters<T extends @NonNull Object>
extends Row {
return (W) fieldValue;
}
+ public TypeDescriptor<?> getGetterTargetType() {
+ return getterTargetType;
+ }
+
private boolean cacheFieldType(Field field) {
TypeName typeName = field.getType().getTypeName();
return typeName.equals(TypeName.MAP)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
index f4700212511..99b858a4e30 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AutoValueSchemaTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas;
+import static
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.assertSchemaEquivalent;
import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
@@ -28,6 +29,8 @@ import com.google.auto.value.extension.memoized.Memoized;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@@ -37,9 +40,13 @@ import
org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Test;
@@ -70,7 +77,7 @@ public class AutoValueSchemaTest {
.build();
static final Schema OUTER_SCHEMA = Schema.builder().addRowField("inner",
SIMPLE_SCHEMA).build();
- private Row createSimpleRow(String name) {
+ private static Row createSimpleRow(String name) {
return Row.withSchema(SIMPLE_SCHEMA)
.addValues(
name,
@@ -348,6 +355,50 @@ public class AutoValueSchemaTest {
}
}
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ abstract static class GenericAutoValue<T> {
+ public abstract T getT();
+
+ GenericAutoValue() {}
+
+ public static <T> GenericAutoValue<T> create(T t) {
+ return new AutoValue_AutoValueSchemaTest_GenericAutoValue<>(t);
+ }
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ abstract static class GenericAutoValueWithBuilder<T> {
+ public abstract T getT();
+
+ GenericAutoValueWithBuilder() {}
+
+ public static <T> Builder<T> builder() {
+ return new
AutoValue_AutoValueSchemaTest_GenericAutoValueWithBuilder.Builder<>();
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ public abstract Builder<T> setT(T t);
+
+ public abstract GenericAutoValueWithBuilder<T> build();
+ }
+ }
+
+ @DefaultSchema(AutoValueSchema.class)
+ @AutoValue
+ abstract static class GenericAutoValueWithCreator<T> {
+ public abstract T getT();
+
+ GenericAutoValueWithCreator() {}
+
+ @SchemaCreate
+ public static <T> GenericAutoValueWithCreator<T> create(T t) {
+ return new
AutoValue_AutoValueSchemaTest_GenericAutoValueWithCreator<>(t);
+ }
+ }
+
private void verifyRow(Row row) {
assertEquals("string", row.getString("str"));
assertEquals((byte) 1, (Object) row.getByte("aByte"));
@@ -385,6 +436,375 @@ public class AutoValueSchemaTest {
SchemaTestUtils.assertSchemaEquivalent(SIMPLE_SCHEMA, schema);
}
+ @Test
+ public void testGenericAutoValueSchema() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema actual = registry.getSchema(new
TypeDescriptor<GenericAutoValue<SimpleSchema>>() {});
+ Schema expected = Schema.builder().addRowField("t", SIMPLE_SCHEMA).build();
+ assertSchemaEquivalent(expected, actual);
+ }
+
+ @Test
+ public void testNestedGenericAutoValueSchema() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema actual =
+ registry.getSchema(
+ new
TypeDescriptor<GenericAutoValue<GenericAutoValue<SimpleSchema>>>() {});
+ Schema expected =
+ Schema.builder()
+ .addRowField("t", Schema.builder().addRowField("t",
SIMPLE_SCHEMA).build())
+ .build();
+
+ assertSchemaEquivalent(expected, actual);
+ }
+
+ @Test
+ public void testGenericAutoValueToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<GenericAutoValue<SimpleSchema>, Row> toRow =
+ registry.getToRowFunction(new
TypeDescriptor<GenericAutoValue<SimpleSchema>>() {});
+ Row row =
+ toRow.apply(
+ GenericAutoValue.create(
+ new AutoValue_AutoValueSchemaTest_SimpleAutoValue(
+ "string",
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ true,
+ DATE,
+ BYTE_ARRAY,
+ ByteBuffer.wrap(BYTE_ARRAY),
+ DATE.toInstant(),
+ BigDecimal.ONE,
+ STRING_BUILDER)));
+
+ verifyRow(row.getRow("t"));
+ }
+
+ @Test
+ public void testGenericAutoValueFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<Row, GenericAutoValue<SimpleAutoValue>> fromRow =
+ registry.getFromRowFunction(new
TypeDescriptor<GenericAutoValue<SimpleAutoValue>>() {});
+
+ Row row =
+ Row.withSchema(Schema.builder().addRowField("t",
SIMPLE_SCHEMA).build())
+ .withFieldValue("t", createSimpleRow("string"))
+ .build();
+ GenericAutoValue<SimpleAutoValue> actual = fromRow.apply(row);
+ verifyAutoValue(actual.getT());
+ }
+
+ @Test
+ public void testGenericAutoValueWithCreatorFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<Row, GenericAutoValueWithCreator<SimpleAutoValue>>
fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<GenericAutoValueWithCreator<SimpleAutoValue>>()
{});
+
+ Row row =
+ Row.withSchema(Schema.builder().addRowField("t",
SIMPLE_SCHEMA).build())
+ .withFieldValue("t", createSimpleRow("string"))
+ .build();
+ GenericAutoValueWithCreator<SimpleAutoValue> actual = fromRow.apply(row);
+ verifyAutoValue(actual.getT());
+ }
+
+ @Test
+ public void testGenericAutoValueWithBuilderFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<Row, GenericAutoValueWithBuilder<SimpleAutoValue>>
fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<GenericAutoValueWithBuilder<SimpleAutoValue>>()
{});
+
+ Row row =
+ Row.withSchema(Schema.builder().addRowField("t",
SIMPLE_SCHEMA).build())
+ .withFieldValue("t", createSimpleRow("string"))
+ .build();
+ GenericAutoValueWithBuilder<SimpleAutoValue> actual = fromRow.apply(row);
+ verifyAutoValue(actual.getT());
+ }
+
+ @Test
+ public void testGenericAutoValueBuilderOfMapOfCreatorsFromRow() throws
Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<
+ Row, GenericAutoValueWithBuilder<Map<String,
GenericAutoValueWithCreator<String>>>>
+ fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<
+ GenericAutoValueWithBuilder<
+ Map<String, GenericAutoValueWithCreator<String>>>>()
{});
+
+ Schema mapValueSchema = Schema.builder().addField("t",
FieldType.STRING).build();
+
+ Row row =
+ Row.withSchema(
+ Schema.builder()
+ .addMapField("t", FieldType.STRING,
FieldType.row(mapValueSchema))
+ .build())
+ .withFieldValue(
+ "t",
+ ImmutableMap.<String, Row>builder()
+ .put("k1",
Row.withSchema(mapValueSchema).withFieldValue("t", "v1").build())
+ .put("k2",
Row.withSchema(mapValueSchema).withFieldValue("t", "v2").build())
+ .build())
+ .build();
+
+ GenericAutoValueWithBuilder<Map<String,
GenericAutoValueWithCreator<String>>> actual =
+ fromRow.apply(row);
+ GenericAutoValueWithCreator<String> genericAutoValue1 =
+ GenericAutoValueWithCreator.create("v1");
+ GenericAutoValueWithCreator<String> genericAutoValue2 =
+ GenericAutoValueWithCreator.create("v2");
+
+ assertEquals(genericAutoValue1, actual.getT().get("k1"));
+ assertEquals(genericAutoValue2, actual.getT().get("k2"));
+ }
+
+ @Test
+ public void testGenericAutoValueCreatorOfMapOfBuildersFromRow() throws
Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<
+ Row, GenericAutoValueWithCreator<Map<String,
GenericAutoValueWithBuilder<String>>>>
+ fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<
+ GenericAutoValueWithCreator<
+ Map<String, GenericAutoValueWithBuilder<String>>>>()
{});
+
+ Schema mapValueSchema = Schema.builder().addField("t",
FieldType.STRING).build();
+
+ Row row =
+ Row.withSchema(
+ Schema.builder()
+ .addMapField("t", FieldType.STRING,
FieldType.row(mapValueSchema))
+ .build())
+ .withFieldValue(
+ "t",
+ ImmutableMap.<String, Row>builder()
+ .put("k1",
Row.withSchema(mapValueSchema).withFieldValue("t", "v1").build())
+ .put("k2",
Row.withSchema(mapValueSchema).withFieldValue("t", "v2").build())
+ .build())
+ .build();
+
+ GenericAutoValueWithCreator<Map<String,
GenericAutoValueWithBuilder<String>>> actual =
+ fromRow.apply(row);
+ GenericAutoValueWithBuilder<String> genericAutoValue1 =
+ GenericAutoValueWithBuilder.<String>builder().setT("v1").build();
+ GenericAutoValueWithBuilder<String> genericAutoValue2 =
+ GenericAutoValueWithBuilder.<String>builder().setT("v2").build();
+
+ assertEquals(genericAutoValue1, actual.getT().get("k1"));
+ assertEquals(genericAutoValue2, actual.getT().get("k2"));
+ }
+
+ @Test
+ public void testGenericAutoValueBuilderOfListOfCreatorsFromRow() throws
Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<
+ Row,
GenericAutoValueWithBuilder<List<GenericAutoValueWithCreator<String>>>>
+ fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<
+
GenericAutoValueWithBuilder<List<GenericAutoValueWithCreator<String>>>>() {});
+
+ Schema listElementSchema = Schema.builder().addField("t",
FieldType.STRING).build();
+
+ Row row =
+ Row.withSchema(
+ Schema.builder().addArrayField("t",
FieldType.row(listElementSchema)).build())
+ .withFieldValue(
+ "t",
+ ImmutableList.<Row>builder()
+ .add(Row.withSchema(listElementSchema).withFieldValue("t",
"v1").build())
+ .add(Row.withSchema(listElementSchema).withFieldValue("t",
"v2").build())
+ .build())
+ .build();
+
+ GenericAutoValueWithBuilder<List<GenericAutoValueWithCreator<String>>>
actual =
+ fromRow.apply(row);
+ GenericAutoValueWithCreator<String> genericAutoValue1 =
+ GenericAutoValueWithCreator.create("v1");
+ GenericAutoValueWithCreator<String> genericAutoValue2 =
+ GenericAutoValueWithCreator.create("v2");
+
+ assertEquals(genericAutoValue1, actual.getT().get(0));
+ assertEquals(genericAutoValue2, actual.getT().get(1));
+ }
+
+ @Test
+ public void testGenericAutoValueCreatorOfListOfBuildersFromRow() throws
Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<
+ Row,
GenericAutoValueWithCreator<List<GenericAutoValueWithBuilder<String>>>>
+ fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<
+
GenericAutoValueWithCreator<List<GenericAutoValueWithBuilder<String>>>>() {});
+
+ Schema listElementSchema = Schema.builder().addField("t",
FieldType.STRING).build();
+
+ Row row =
+ Row.withSchema(
+ Schema.builder().addArrayField("t",
FieldType.row(listElementSchema)).build())
+ .withFieldValue(
+ "t",
+ ImmutableList.<Row>builder()
+ .add(Row.withSchema(listElementSchema).withFieldValue("t",
"v1").build())
+ .add(Row.withSchema(listElementSchema).withFieldValue("t",
"v2").build())
+ .build())
+ .build();
+
+ GenericAutoValueWithCreator<List<GenericAutoValueWithBuilder<String>>>
actual =
+ fromRow.apply(row);
+ GenericAutoValueWithBuilder<String> genericAutoValue1 =
+ GenericAutoValueWithBuilder.<String>builder().setT("v1").build();
+ GenericAutoValueWithBuilder<String> genericAutoValue2 =
+ GenericAutoValueWithBuilder.<String>builder().setT("v2").build();
+
+ assertEquals(genericAutoValue1, actual.getT().get(0));
+ assertEquals(genericAutoValue2, actual.getT().get(1));
+ }
+
+ @Test
+ public void testGenericAutoValueBuilderOfArrayOfCreatorsFromRow() throws
Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<Row,
GenericAutoValueWithBuilder<GenericAutoValueWithCreator<String>[]>>
+ fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<
+
GenericAutoValueWithBuilder<GenericAutoValueWithCreator<String>[]>>() {});
+
+ Schema arrayElementSchema = Schema.builder().addField("t",
FieldType.STRING).build();
+
+ Row row =
+ Row.withSchema(
+ Schema.builder().addArrayField("t",
FieldType.row(arrayElementSchema)).build())
+ .withFieldValue(
+ "t",
+ ImmutableList.<Row>builder()
+
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v1").build())
+
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v2").build())
+ .build())
+ .build();
+
+ GenericAutoValueWithBuilder<GenericAutoValueWithCreator<String>[]> actual
= fromRow.apply(row);
+ GenericAutoValueWithCreator<String> genericAutoValue1 =
+ GenericAutoValueWithCreator.create("v1");
+ GenericAutoValueWithCreator<String> genericAutoValue2 =
+ GenericAutoValueWithCreator.create("v2");
+
+ assertEquals(genericAutoValue1, actual.getT()[0]);
+ assertEquals(genericAutoValue2, actual.getT()[1]);
+ }
+
+ @Test
+ public void testGenericAutoValueCreatorOfArrayOfBuildersFromRow() throws
Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<Row,
GenericAutoValueWithCreator<GenericAutoValueWithBuilder<String>[]>>
+ fromRow =
+ registry.getFromRowFunction(
+ new TypeDescriptor<
+
GenericAutoValueWithCreator<GenericAutoValueWithBuilder<String>[]>>() {});
+
+ Schema arrayElementSchema = Schema.builder().addField("t",
FieldType.STRING).build();
+
+ Row row =
+ Row.withSchema(
+ Schema.builder().addArrayField("t",
FieldType.row(arrayElementSchema)).build())
+ .withFieldValue(
+ "t",
+ ImmutableList.<Row>builder()
+
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v1").build())
+
.add(Row.withSchema(arrayElementSchema).withFieldValue("t", "v2").build())
+ .build())
+ .build();
+
+ GenericAutoValueWithCreator<GenericAutoValueWithBuilder<String>[]> actual
= fromRow.apply(row);
+ GenericAutoValueWithBuilder<String> genericAutoValue1 =
+ GenericAutoValueWithBuilder.<String>builder().setT("v1").build();
+ GenericAutoValueWithBuilder<String> genericAutoValue2 =
+ GenericAutoValueWithBuilder.<String>builder().setT("v2").build();
+
+ assertEquals(genericAutoValue1, actual.getT()[0]);
+ assertEquals(genericAutoValue2, actual.getT()[1]);
+ }
+
+ @Test
+ public void testGenericAutoValueWithMapToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<GenericAutoValue<Map<String,
GenericAutoValue<String>>>, Row> toRow =
+ registry.getToRowFunction(
+ new TypeDescriptor<GenericAutoValue<Map<String,
GenericAutoValue<String>>>>() {});
+
+ GenericAutoValue<String> genericAutoValue1 = GenericAutoValue.create("v1");
+ GenericAutoValue<String> genericAutoValue2 = GenericAutoValue.create("v2");
+
+ Row row =
+ toRow.apply(
+ GenericAutoValue.create(
+ ImmutableMap.of("k1", genericAutoValue1, "k2",
genericAutoValue2)));
+
+ assertEquals("v1", row.<String, Row>getMap("t").get("k1").getString("t"));
+ assertEquals("v2", row.<String, Row>getMap("t").get("k2").getString("t"));
+ }
+
+ @Test
+ public void testGenericAutoValueWithListToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<GenericAutoValue<List<GenericAutoValue<String>>>,
Row> toRow =
+ registry.getToRowFunction(
+ new
TypeDescriptor<GenericAutoValue<List<GenericAutoValue<String>>>>() {});
+
+ GenericAutoValue<String> genericAutoValue1 = GenericAutoValue.create("v1");
+ GenericAutoValue<String> genericAutoValue2 = GenericAutoValue.create("v2");
+
+ Row row =
+ toRow.apply(
+ GenericAutoValue.create(ImmutableList.of(genericAutoValue1,
genericAutoValue2)));
+ Row[] genericAutoValueRows = row.<Row>getArray("t").toArray(new Row[0]);
+
+ assertEquals("v1", genericAutoValueRows[0].getString("t"));
+ assertEquals("v2", genericAutoValueRows[1].getString("t"));
+ }
+
+ @Test
+ public void testGenericAutoValueWithArrayToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SerializableFunction<GenericAutoValue<GenericAutoValue<String>[]>, Row>
toRow =
+ registry.getToRowFunction(
+ new TypeDescriptor<GenericAutoValue<GenericAutoValue<String>[]>>()
{});
+
+ GenericAutoValue<String> genericAutoValue1 = GenericAutoValue.create("v1");
+ GenericAutoValue<String> genericAutoValue2 = GenericAutoValue.create("v2");
+
+ @SuppressWarnings("unchecked")
+ Row row =
+ toRow.apply(
+ GenericAutoValue.create(new GenericAutoValue[] {genericAutoValue1,
genericAutoValue2}));
+ Row[] genericAutoValueRows = row.<Row>getArray("t").toArray(new Row[0]);
+
+ assertEquals("v1", genericAutoValueRows[0].getString("t"));
+ assertEquals("v2", genericAutoValueRows[1].getString("t"));
+ }
+
+ @Test
+ public void testNestedGenericAutoValueToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+
SerializableFunction<GenericAutoValue<GenericAutoValue<GenericAutoValue<String>>>,
Row> toRow =
+ registry.getToRowFunction(
+ new
TypeDescriptor<GenericAutoValue<GenericAutoValue<GenericAutoValue<String>>>>()
{});
+
+ Row row =
+ toRow.apply(
+
GenericAutoValue.create(GenericAutoValue.create(GenericAutoValue.create("v1"))));
+
+ assertEquals("v1", row.getRow("t").getRow("t").getString("t"));
+ }
+
@Test
public void testToRowConstructor() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -402,6 +822,7 @@ public class AutoValueSchemaTest {
DATE.toInstant(),
BigDecimal.ONE,
STRING_BUILDER);
+
Row row = registry.getToRowFunction(SimpleAutoValue.class).apply(value);
verifyRow(row);
}
@@ -444,6 +865,7 @@ public class AutoValueSchemaTest {
DATE.toInstant(),
BigDecimal.ONE,
STRING_BUILDER);
+
Row row = registry.getToRowFunction(MemoizedAutoValue.class).apply(value);
verifyRow(row);
}
@@ -571,6 +993,7 @@ public class AutoValueSchemaTest {
DATE.toInstant(),
BigDecimal.ONE,
STRING_BUILDER);
+
AutoValueOuter outer = new
AutoValue_AutoValueSchemaTest_AutoValueOuter(inner);
Row row = registry.getToRowFunction(AutoValueOuter.class).apply(outer);
verifyRow(row.getRow("inner"));
@@ -675,6 +1098,7 @@ public class AutoValueSchemaTest {
Instant instant,
BigDecimal bigDecimal,
StringBuilder stringBuilder) {
+
return new
AutoValue_AutoValueSchemaTest_SimpleAutoValueWithStaticFactory(
str,
aByte,
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
index de0953a0a08..21c6dee1369 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaBeanSchemaTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas;
+import static
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.assertSchemaEquivalent;
import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ALL_NULLABLE_BEAN_SCHEMA;
import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ANNOTATED_SIMPLE_BEAN_SCHEMA;
@@ -33,6 +34,7 @@ import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.PARAMETER_NULLABLE
import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.PRIMITIVE_ARRAY_BEAN_SCHEMA;
import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.RENAMED_FIELDS_AND_SETTERS_BEAM_SCHEMA;
import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.SIMPLE_BEAN_SCHEMA;
+import static
org.apache.beam.sdk.schemas.utils.TestJavaBeans.genericBeanSchema;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -63,6 +65,7 @@ import
org.apache.beam.sdk.schemas.utils.TestJavaBeans.ArrayOfByteArray;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithCaseFormat;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithNoCreateOption;
import
org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithRenamedFieldsAndSetters;
+import org.apache.beam.sdk.schemas.utils.TestJavaBeans.GenericBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.IterableBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.JavaTimeBean;
import org.apache.beam.sdk.schemas.utils.TestJavaBeans.MismatchingNullableBean;
@@ -76,9 +79,11 @@ import
org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
import
org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBeanWithAnnotations;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
import org.joda.time.DateTime;
import org.junit.Ignore;
@@ -139,6 +144,16 @@ public class JavaBeanSchemaTest {
.build();
}
+ private <T> GenericBean<T> createGeneric(T t) {
+ GenericBean<T> genericBean = new GenericBean<>();
+ genericBean.setT(t);
+ return genericBean;
+ }
+
+ private Row createGenericRow(Schema.FieldType tFieldType, Object
tFieldValue) {
+ return Row.withSchema(genericBeanSchema(tFieldType)).withFieldValue("t",
tFieldValue).build();
+ }
+
@Test
public void testSchema() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -146,14 +161,9 @@ public class JavaBeanSchemaTest {
SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
}
- @Test
- public void testToRow() throws NoSuchSchemaException {
- SchemaRegistry registry = SchemaRegistry.createDefault();
- SimpleBean bean = createSimple("string");
- Row row = registry.getToRowFunction(SimpleBean.class).apply(bean);
-
+ private static void verifyRow(String expectedStrField, Row row) {
assertEquals(12, row.getFieldCount());
- assertEquals("string", row.getString("str"));
+ assertEquals(expectedStrField, row.getString("str"));
assertEquals((byte) 1, (Object) row.getByte("aByte"));
assertEquals((short) 2, (Object) row.getInt16("aShort"));
assertEquals((int) 3, (Object) row.getInt32("anInt"));
@@ -167,13 +177,8 @@ public class JavaBeanSchemaTest {
assertEquals("stringbuilder", row.getString("stringBuilder"));
}
- @Test
- public void testFromRow() throws NoSuchSchemaException {
- SchemaRegistry registry = SchemaRegistry.createDefault();
- Row row = createSimpleRow("string");
-
- SimpleBean bean = registry.getFromRowFunction(SimpleBean.class).apply(row);
- assertEquals("string", bean.getStr());
+ private static void verifySimpleBean(String expectedStrField, SimpleBean
bean) {
+ assertEquals(expectedStrField, bean.getStr());
assertEquals((byte) 1, bean.getaByte());
assertEquals((short) 2, bean.getaShort());
assertEquals((int) 3, bean.getAnInt());
@@ -262,6 +267,23 @@ public class JavaBeanSchemaTest {
assertEquals(original, roundTripped);
}
+ @Test
+ public void testToRow() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SimpleBean bean = createSimple("string");
+ Row row = registry.getToRowFunction(SimpleBean.class).apply(bean);
+ verifyRow("string", row);
+ }
+
+ @Test
+ public void testFromRow() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row = createSimpleRow("string");
+
+ SimpleBean bean = registry.getFromRowFunction(SimpleBean.class).apply(row);
+ verifySimpleBean("string", bean);
+ }
+
@Test
public void testNullableToRow() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -708,4 +730,121 @@ public class JavaBeanSchemaTest {
assertEquals(
registry.getFromRowFunction(BeanWithCaseFormat.class).apply(row),
beanWithCaseFormat);
}
+
+ @Test
+ public void testGenericBeamSchema() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema actual = registry.getSchema(new
TypeDescriptor<GenericBean<SimpleBean>>() {});
+ Schema expected =
genericBeanSchema(Schema.FieldType.row(SIMPLE_BEAN_SCHEMA));
+
+ assertSchemaEquivalent(expected, actual);
+ }
+
+ @Test
+ public void testGenericBeamSchemaToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ GenericBean<GenericBean<SimpleBean>> genericBean =
+ createGeneric(createGeneric(createSimple("string")));
+
+ Row row =
+ registry
+ .getToRowFunction(new
TypeDescriptor<GenericBean<GenericBean<SimpleBean>>>() {})
+ .apply(genericBean);
+
+ verifyRow("string", row.getRow("t").getRow("t"));
+ }
+
+ @Test
+ public void testGenericBeamSchemaFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema nestedSchema =
genericBeanSchema(Schema.FieldType.row(SIMPLE_BEAN_SCHEMA));
+ Row row =
+ createGenericRow(
+ Schema.FieldType.row(nestedSchema),
+ createGenericRow(Schema.FieldType.row(SIMPLE_BEAN_SCHEMA),
createSimpleRow("string")));
+ GenericBean<GenericBean<SimpleBean>> actual =
+ registry
+ .getFromRowFunction(new
TypeDescriptor<GenericBean<GenericBean<SimpleBean>>>() {})
+ .apply(row);
+
+ verifySimpleBean("string", actual.getT().getT());
+ }
+
+ @Test
+ public void testGenericBeamSchemaMapToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row =
+ registry
+ .getToRowFunction(
+ new TypeDescriptor<GenericBean<Map<String,
GenericBean<String>>>>() {})
+ .apply(
+ createGeneric(
+ ImmutableMap.<String, GenericBean<String>>builder()
+ .put("k1", createGeneric("v1"))
+ .put("k2", createGeneric("v2"))
+ .build()));
+
+ assertEquals("v1", row.<String, Row>getMap("t").get("k1").getString("t"));
+ assertEquals("v2", row.<String, Row>getMap("t").get("k2").getString("t"));
+ }
+
+ @Test
+ public void testGenericBeamSchemaMapFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema.FieldType mapValueFieldType =
+ Schema.FieldType.row(genericBeanSchema(Schema.FieldType.STRING));
+ GenericBean<Map<String, GenericBean<String>>> actual =
+ registry
+ .getFromRowFunction(
+ new TypeDescriptor<GenericBean<Map<String,
GenericBean<String>>>>() {})
+ .apply(
+ createGenericRow(
+ Schema.FieldType.map(Schema.FieldType.STRING,
mapValueFieldType),
+ ImmutableMap.<String, Row>builder()
+ .put("k1", createGenericRow(Schema.FieldType.STRING,
"v1"))
+ .put("k2", createGenericRow(Schema.FieldType.STRING,
"v2"))
+ .build()));
+
+ assertEquals("v1", actual.getT().get("k1").getT());
+ assertEquals("v2", actual.getT().get("k2").getT());
+ }
+
+ @Test
+ public void testGenericBeamSchemaIterableToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row =
+ registry
+ .getToRowFunction(new
TypeDescriptor<GenericBean<Iterable<GenericBean<String>>>>() {})
+ .apply(
+ createGeneric(
+ ImmutableList.<GenericBean<String>>builder()
+ .add(createGeneric("v1"))
+ .add(createGeneric("v2"))
+ .build()));
+
+ Row[] rows = Streams.stream(row.<Row>getIterable("t")).toArray(Row[]::new);
+
+ assertEquals("v1", rows[0].getString("t"));
+ assertEquals("v2", rows[1].getString("t"));
+ }
+
+ @Test
+ public void testGenericBeamSchemaIterableFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema.FieldType elementFieldType =
+ Schema.FieldType.row(genericBeanSchema(Schema.FieldType.STRING));
+ GenericBean<Iterable<GenericBean<String>>> actual =
+ registry
+ .getFromRowFunction(new
TypeDescriptor<GenericBean<Iterable<GenericBean<String>>>>() {})
+ .apply(
+ createGenericRow(
+ Schema.FieldType.array(elementFieldType),
+ ImmutableList.<Row>builder()
+ .add(createGenericRow(Schema.FieldType.STRING, "v1"))
+ .add(createGenericRow(Schema.FieldType.STRING, "v2"))
+ .build()));
+ GenericBean<String>[] beans =
Streams.stream(actual.getT()).toArray(GenericBean[]::new);
+ assertEquals("v1", beans[0].getT());
+ assertEquals("v2", beans[1].getT());
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
index c80b758adc3..d4f94f021c3 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/JavaFieldSchemaTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.schemas;
+import static
org.apache.beam.sdk.schemas.utils.SchemaTestUtils.assertSchemaEquivalent;
import static org.apache.beam.sdk.schemas.utils.SchemaTestUtils.equivalentTo;
import static
org.apache.beam.sdk.schemas.utils.TestPOJOs.ANNOTATED_SIMPLE_POJO_SCHEMA;
import static
org.apache.beam.sdk.schemas.utils.TestPOJOs.CASE_FORMAT_POJO_SCHEMA;
@@ -36,6 +37,7 @@ import static
org.apache.beam.sdk.schemas.utils.TestPOJOs.POJO_WITH_NESTED_ARRAY
import static
org.apache.beam.sdk.schemas.utils.TestPOJOs.PRIMITIVE_ARRAY_POJO_SCHEMA;
import static org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_SCHEMA;
import static
org.apache.beam.sdk.schemas.utils.TestPOJOs.SIMPLE_POJO_WITH_DESCRIPTION_SCHEMA;
+import static org.apache.beam.sdk.schemas.utils.TestPOJOs.genericPOJOSchema;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
@@ -64,6 +66,7 @@ import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
import org.apache.beam.sdk.schemas.utils.TestPOJOs;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.AnnotatedSimplePojo;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.FirstCircularNestedPOJO;
+import org.apache.beam.sdk.schemas.utils.TestPOJOs.GenericPOJO;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.JavaTimePOJO;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArraysPOJO;
@@ -86,9 +89,11 @@ import
org.apache.beam.sdk.schemas.utils.TestPOJOs.StaticCreationSimplePojo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Ints;
import org.joda.time.DateTime;
import org.joda.time.Instant;
@@ -190,6 +195,10 @@ public class JavaFieldSchemaTest {
.build();
}
+ private static Row createGenericRow(FieldType tFieldType, Object
tFieldValue) {
+ return Row.withSchema(genericPOJOSchema(tFieldType)).withFieldValue("t",
tFieldValue).build();
+ }
+
@Test
public void testSchema() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -197,14 +206,9 @@ public class JavaFieldSchemaTest {
SchemaTestUtils.assertSchemaEquivalent(SIMPLE_POJO_SCHEMA, schema);
}
- @Test
- public void testToRow() throws NoSuchSchemaException {
- SchemaRegistry registry = SchemaRegistry.createDefault();
- SimplePOJO pojo = createSimple("string");
- Row row = registry.getToRowFunction(SimplePOJO.class).apply(pojo);
-
+ private static void verifySimpleRow(String expectedStrField, Row row) {
assertEquals(12, row.getFieldCount());
- assertEquals("string", row.getString("str"));
+ assertEquals(expectedStrField, row.getString("str"));
assertEquals((byte) 1, (Object) row.getByte("aByte"));
assertEquals((short) 2, (Object) row.getInt16("aShort"));
assertEquals((int) 3, (Object) row.getInt32("anInt"));
@@ -218,13 +222,8 @@ public class JavaFieldSchemaTest {
assertEquals("stringbuilder", row.getString("stringBuilder"));
}
- @Test
- public void testFromRow() throws NoSuchSchemaException {
- SchemaRegistry registry = SchemaRegistry.createDefault();
- Row row = createSimpleRow("string");
-
- SimplePOJO pojo = registry.getFromRowFunction(SimplePOJO.class).apply(row);
- assertEquals("string", pojo.str);
+ private static void verifySimplePOJO(String expectedStrField, SimplePOJO
pojo) {
+ assertEquals(expectedStrField, pojo.str);
assertEquals((byte) 1, pojo.aByte);
assertEquals((short) 2, pojo.aShort);
assertEquals((int) 3, pojo.anInt);
@@ -350,6 +349,23 @@ public class JavaFieldSchemaTest {
assertNull(pojo.uuid);
}
+ @Test
+ public void testToRow() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ SimplePOJO pojo = createSimple("string");
+ Row row = registry.getToRowFunction(SimplePOJO.class).apply(pojo);
+ verifySimpleRow("string", row);
+ }
+
+ @Test
+ public void testFromRow() throws NoSuchSchemaException {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row = createSimpleRow("string");
+
+ SimplePOJO pojo = registry.getFromRowFunction(SimplePOJO.class).apply(row);
+ verifySimplePOJO("string", pojo);
+ }
+
@Test
public void testNullableSchema() throws NoSuchSchemaException {
SchemaRegistry registry = SchemaRegistry.createDefault();
@@ -903,4 +919,156 @@ public class JavaFieldSchemaTest {
thrown.getMessage(),
containsString("TestPOJOs$FirstCircularNestedPOJO"));
}
+
+ @Test
+ public void testGenericPOJOSchema() throws Exception {
+ Schema actual =
+ SchemaRegistry.createDefault()
+ .getSchema(new
TypeDescriptor<GenericPOJO<GenericPOJO<SimplePOJO>>>() {});
+ Schema expected =
+
genericPOJOSchema(FieldType.row(genericPOJOSchema(FieldType.row(SIMPLE_POJO_SCHEMA))));
+ assertSchemaEquivalent(expected, actual);
+ }
+
+ @Test
+ public void testGenericPOJOToRow() throws Exception {
+ Row row =
+ SchemaRegistry.createDefault()
+ .getToRowFunction(new
TypeDescriptor<GenericPOJO<GenericPOJO<SimplePOJO>>>() {})
+
.apply(GenericPOJO.create(GenericPOJO.create(createSimple("string"))));
+
+ verifySimpleRow("string", row.getRow("t").getRow("t"));
+ }
+
+ @Test
+ public void testGenericPOJOFromRow() throws Exception {
+ FieldType innerGenericPOJOFieldType =
+ FieldType.row(genericPOJOSchema(FieldType.row(SIMPLE_POJO_SCHEMA)));
+ GenericPOJO<GenericPOJO<SimplePOJO>> actualPOJO =
+ SchemaRegistry.createDefault()
+ .getFromRowFunction(new
TypeDescriptor<GenericPOJO<GenericPOJO<SimplePOJO>>>() {})
+ .apply(
+ createGenericRow(
+ innerGenericPOJOFieldType,
+ createGenericRow(
+ FieldType.row(SIMPLE_POJO_SCHEMA),
createSimpleRow("string"))));
+
+ verifySimplePOJO("string", actualPOJO.t.t);
+ }
+
+ @Test
+ public void testGenericPOJOSchemaMapToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row =
+ registry
+ .getToRowFunction(
+ new TypeDescriptor<GenericPOJO<Map<String,
GenericPOJO<String>>>>() {})
+ .apply(
+ GenericPOJO.create(
+ ImmutableMap.<String, GenericPOJO<String>>builder()
+ .put("k1", GenericPOJO.create("v1"))
+ .put("k2", GenericPOJO.create("v2"))
+ .build()));
+
+ assertEquals("v1", row.<String, Row>getMap("t").get("k1").getString("t"));
+ assertEquals("v2", row.<String, Row>getMap("t").get("k2").getString("t"));
+ }
+
+ @Test
+ public void testGenericPOJOSchemaMapFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema.FieldType mapValueFieldType =
+ Schema.FieldType.row(genericPOJOSchema(Schema.FieldType.STRING));
+ GenericPOJO<Map<String, GenericPOJO<String>>> actual =
+ registry
+ .getFromRowFunction(
+ new TypeDescriptor<GenericPOJO<Map<String,
GenericPOJO<String>>>>() {})
+ .apply(
+ createGenericRow(
+ Schema.FieldType.map(Schema.FieldType.STRING,
mapValueFieldType),
+ ImmutableMap.<String, Row>builder()
+ .put("k1", createGenericRow(Schema.FieldType.STRING,
"v1"))
+ .put("k2", createGenericRow(Schema.FieldType.STRING,
"v2"))
+ .build()));
+
+ assertEquals("v1", actual.t.get("k1").t);
+ assertEquals("v2", actual.t.get("k2").t);
+ }
+
+ @Test
+ public void testGenericBeamSchemaIterableToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row =
+ registry
+ .getToRowFunction(new
TypeDescriptor<GenericPOJO<Iterable<GenericPOJO<String>>>>() {})
+ .apply(
+ GenericPOJO.create(
+ ImmutableList.<GenericPOJO<String>>builder()
+ .add(GenericPOJO.create("v1"))
+ .add(GenericPOJO.create("v2"))
+ .build()));
+
+ Row[] rows = Streams.stream(row.<Row>getIterable("t")).toArray(Row[]::new);
+
+ assertEquals("v1", rows[0].getString("t"));
+ assertEquals("v2", rows[1].getString("t"));
+ }
+
+ @Test
+ public void testGenericBeamSchemaIterableFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema.FieldType elementFieldType =
+ Schema.FieldType.row(genericPOJOSchema(Schema.FieldType.STRING));
+ GenericPOJO<Iterable<GenericPOJO<String>>> actual =
+ registry
+ .getFromRowFunction(new
TypeDescriptor<GenericPOJO<Iterable<GenericPOJO<String>>>>() {})
+ .apply(
+ createGenericRow(
+ Schema.FieldType.array(elementFieldType),
+ ImmutableList.<Row>builder()
+ .add(createGenericRow(Schema.FieldType.STRING, "v1"))
+ .add(createGenericRow(Schema.FieldType.STRING, "v2"))
+ .build()));
+ GenericPOJO<String>[] pojos =
Streams.stream(actual.t).toArray(GenericPOJO[]::new);
+ assertEquals("v1", pojos[0].t);
+ assertEquals("v2", pojos[1].t);
+ }
+
+ @Test
+ public void testGenericBeamSchemaArrayToRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Row row =
+ registry
+ .getToRowFunction(new
TypeDescriptor<GenericPOJO<GenericPOJO<String>[]>>() {})
+ .apply(
+ GenericPOJO.create(
+ new GenericPOJO[] {
+ GenericPOJO.create("v1"), GenericPOJO.create("v2"),
+ }));
+
+ Row[] rows = Streams.stream(row.<Row>getIterable("t")).toArray(Row[]::new);
+
+ assertEquals("v1", rows[0].getString("t"));
+ assertEquals("v2", rows[1].getString("t"));
+ }
+
+ @Test
+ public void testGenericBeamSchemaArrayFromRow() throws Exception {
+ SchemaRegistry registry = SchemaRegistry.createDefault();
+ Schema.FieldType elementFieldType =
+ Schema.FieldType.row(genericPOJOSchema(Schema.FieldType.STRING));
+ GenericPOJO<GenericPOJO<String>[]> actual =
+ registry
+ .getFromRowFunction(new
TypeDescriptor<GenericPOJO<GenericPOJO<String>[]>>() {})
+ .apply(
+ createGenericRow(
+ Schema.FieldType.array(elementFieldType),
+ ImmutableList.<Row>builder()
+ .add(createGenericRow(Schema.FieldType.STRING, "v1"))
+ .add(createGenericRow(Schema.FieldType.STRING, "v2"))
+ .build()));
+ GenericPOJO<String>[] pojos = actual.t;
+ assertEquals("v1", pojos[0].t);
+ assertEquals("v2", pojos[1].t);
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
index d8ed86f2b55..72db48ccf97 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestJavaBeans.java
@@ -1487,4 +1487,39 @@ public class TestJavaBeans {
.addLogicalTypeField("instant", new NanosInstant())
.addLogicalTypeField("uuid", SqlTypes.UUID)
.build();
+
+ @DefaultSchema(JavaBeanSchema.class)
+ public static class GenericBean<T> {
+ @Nullable private T t;
+
+ @Nullable
+ public T getT() {
+ return t;
+ }
+
+ public void setT(@Nullable T t) {
+ this.t = t;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof GenericBean)) {
+ return false;
+ }
+ GenericBean<?> that = (GenericBean<?>) o;
+ return Objects.equals(t, that.t);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(t);
+ }
+ }
+
+ public static Schema genericBeanSchema(FieldType genericFieldType) {
+ return Schema.builder().addNullableField("t", genericFieldType).build();
+ }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
index 38ec507480f..97f1bb20f42 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/TestPOJOs.java
@@ -1399,4 +1399,29 @@ public class TestPOJOs {
.addNullableLogicalTypeField("instant", new NanosInstant())
.addNullableLogicalTypeField("uuid", SqlTypes.UUID)
.build();
+
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class GenericPOJOWithCreator<T> {
+ public @Nullable T t;
+
+ @SchemaCreate
+ public GenericPOJOWithCreator(@Nullable T t) {
+ this.t = t;
+ }
+ }
+
+ @DefaultSchema(JavaFieldSchema.class)
+ public static class GenericPOJO<T> {
+ public @Nullable T t;
+
+ public static <T> GenericPOJO<T> create(T t) {
+ GenericPOJO<T> genericPOJO = new GenericPOJO<>();
+ genericPOJO.t = t;
+ return genericPOJO;
+ }
+ }
+
+ public static Schema genericPOJOSchema(FieldType tFieldType) {
+ return Schema.builder().addNullableField("t", tFieldType).build();
+ }
}
diff --git
a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
index 3e22bad6a41..bf7078abc58 100644
---
a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
+++
b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java
@@ -533,7 +533,9 @@ public class ArrowConversion {
throw new IllegalStateException("There are no more Rows.");
}
Row result =
- Row.withSchema(schema).withFieldValueGetters(this.fieldValueGetters,
this.currRowIndex);
+ Row.withSchema(schema)
+ .withFieldValueGetters(
+ this.fieldValueGetters, this.currRowIndex,
TypeDescriptor.of(Integer.class));
this.currRowIndex += 1;
return result;
}
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
index 21b5c7bcf97..6b3ff6cac86 100644
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
+++
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsSchemaProvider.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaUtils.SdkBuilderSetter;
import org.apache.beam.sdk.io.aws2.schemas.AwsTypes.ConverterFactory;
import org.apache.beam.sdk.schemas.CachingFactory;
@@ -205,7 +206,11 @@ public class AwsSchemaProvider extends
GetterBasedSchemaProviderV2 {
@Override
public List<FieldValueTypeInformation> fieldValueTypeInformations(
TypeDescriptor<?> targetTypeDescriptor, Schema schema) {
- throw new UnsupportedOperationException("FieldValueTypeInformation not
available");
+ List<SdkField<?>> sdkFieldList = sdkFields((Class)
targetTypeDescriptor.getRawType());
+
+ return sdkFieldList.stream()
+ .map(AwsTypes::fieldValueTypeInformationFor)
+ .collect(Collectors.toList());
}
@Override
diff --git
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
index a0fc0c8e91c..f5b06d3cd1c 100644
---
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
+++
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/schemas/AwsTypes.java
@@ -27,12 +27,14 @@ import static
software.amazon.awssdk.core.protocol.MarshallingType.SDK_BYTES;
import static software.amazon.awssdk.core.protocol.MarshallingType.SDK_POJO;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import org.apache.beam.sdk.schemas.Factory;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -91,6 +93,20 @@ public class AwsTypes {
String.format("Type %s of field %s is unknown.", type,
normalizedNameOf(field)));
}
+ static FieldValueTypeInformation fieldValueTypeInformationFor(SdkField<?>
sdkField) {
+ TypeDescriptor<?> type =
TypeDescriptor.of(sdkField.marshallingType().getTargetClass());
+ return FieldValueTypeInformation.builder()
+ .setName(normalizedNameOf(sdkField))
+ .setType(type)
+ .setRawType(sdkField.marshallingType().getClass())
+
.setElementType(FieldValueTypeInformation.getIterableComponentType(type))
+ .setMapKeyType(FieldValueTypeInformation.getMapKeyType(type))
+ .setMapValueType(FieldValueTypeInformation.getMapValueType(type))
+ .setOneOfTypes(Collections.emptyMap())
+ .setNullable(true)
+ .build();
+ }
+
private static Schema schemaFor(List<SdkField<?>> fields, Set<Class<?>>
seen) {
Schema.Builder builder = Schema.builder();
for (SdkField<?> sdkField : fields) {