Hi everyone,

Every article and talk about Beam recommends using Schemas and Row.
However, using Row throughout my pipelines makes things very difficult to
refactor code when schemas change compared to POJOs/Beans that provide
static code analysis in the IDE.

Does anyone have any tips or tricks to make refactoring easier when using
Row?

For example: The initial pipeline transforms used a Person schema Row
{firstName:string, lastname:string}. There's some steps that do filtering,
and various other things. Now we realize that the Kafka metadata from the
Persons topic is also important, so our Row schema becomes Row {metadata:
Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}.
What would be an easy way to figure out what code changes I need to make to
effectively add an "item." in front of the previous fields accessed by name?

Search and replace isn't ideal. And for more complex pipelines it quickly
becomes very difficult to figure out which fields come from which nested
rows and if they need to change due to refactoring. In fact, the only way
to refactor is to run the pipeline multiple times and analyse the exact
schema at a given line of code before changing it accordingly, then
restarting the process with the breakpoint a little further in the pipeline.

One solution I thought is to leave my PTransforms use Row in their
signature but the first and last steps of those PTransforms would be to use
Convert.to(POJO) and Convert.toRows(schema). Basically this provides a
static context inside the transforms. However, most of my schemas are
derived from Avro schemas, and when adding Avro objects to my POJOs,
Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of
the email)

I guess I could Auto_Value the properties of my Avro objects and use those
automatically generated Auto_Values pojos inside my PTransforms. However,
that means I have to keep my Avro definitions and my Auto_Values in sync.

I understand why Schemas and Row are important, especially after hearing
Andrew talk about them at the 2021 Beam summit [3]. However, using Row
feels a lot like using DataTable [2] in .NET and that brings back
refactoring nightmares.

Thanks,
Cristian

[1]
Exception in thread "main" java.lang.StackOverflowError
at java.base/java.util.HashMap.hash(HashMap.java:339)
at java.base/java.util.HashMap.remove(HashMap.java:794)
at java.base/java.util.HashSet.remove(HashSet.java:236)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398)
at
org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188)
at
org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257)
at
org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166)
at
org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93)

[2]
https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0
[3] https://www.youtube.com/watch?v=4rDZ0b0TOvc

Reply via email to