Hi everyone, Every article and talk about Beam recommends using Schemas and Row. However, using Row throughout my pipelines makes things very difficult to refactor code when schemas change compared to POJOs/Beans that provide static code analysis in the IDE.
Does anyone have any tips or tricks to make refactoring easier when using Row? For example: The initial pipeline transforms used a Person schema Row {firstName:string, lastname:string}. There's some steps that do filtering, and various other things. Now we realize that the Kafka metadata from the Persons topic is also important, so our Row schema becomes Row {metadata: Row {..whatever fields..}, item: Row {firstName: string, lastname:string}}. What would be an easy way to figure out what code changes I need to make to effectively add an "item." in front of the previous fields accessed by name? Search and replace isn't ideal. And for more complex pipelines it quickly becomes very difficult to figure out which fields come from which nested rows and if they need to change due to refactoring. In fact, the only way to refactor is to run the pipeline multiple times and analyse the exact schema at a given line of code before changing it accordingly, then restarting the process with the breakpoint a little further in the pipeline. One solution I thought is to leave my PTransforms use Row in their signature but the first and last steps of those PTransforms would be to use Convert.to(POJO) and Convert.toRows(schema). Basically this provides a static context inside the transforms. However, most of my schemas are derived from Avro schemas, and when adding Avro objects to my POJOs, Convert.to(POJO) throws a StackOverflowError ([1]stacktrace at the end of the email) I guess I could Auto_Value the properties of my Avro objects and use those automatically generated Auto_Values pojos inside my PTransforms. However, that means I have to keep my Avro definitions and my Auto_Values in sync. I understand why Schemas and Row are important, especially after hearing Andrew talk about them at the 2021 Beam summit [3]. However, using Row feels a lot like using DataTable [2] in .NET and that brings back refactoring nightmares. Thanks, Cristian [1] Exception in thread "main" java.lang.StackOverflowError at java.base/java.util.HashMap.hash(HashMap.java:339) at java.base/java.util.HashMap.remove(HashMap.java:794) at java.base/java.util.HashSet.remove(HashSet.java:236) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:88) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitClass(TypeResolver.java:391) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:79) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.visitParameterizedType(TypeResolver.java:403) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeVisitor.visit(TypeVisitor.java:77) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver$TypeMappingIntrospector.getTypeMappings(TypeResolver.java:384) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeResolver.covariantly(TypeResolver.java:75) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getCovariantTypeResolver(TypeToken.java:1181) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.resolveSupertype(TypeToken.java:273) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken.getSupertype(TypeToken.java:398) at org.apache.beam.sdk.values.TypeDescriptor.getSupertype(TypeDescriptor.java:188) at org.apache.beam.sdk.schemas.utils.ReflectUtils.getIterableComponentType(ReflectUtils.java:203) at org.apache.beam.sdk.schemas.FieldValueTypeInformation.getIterableComponentType(FieldValueTypeInformation.java:257) at org.apache.beam.sdk.schemas.FieldValueTypeInformation.forGetter(FieldValueTypeInformation.java:172) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.beam.sdk.schemas.JavaBeanSchema$GetterTypeSupplier.get(JavaBeanSchema.java:67) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:92) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.fieldFromType(StaticSchemaInference.java:166) at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:93) [2] https://docs.microsoft.com/en-us/dotnet/api/system.data.datatable?view=net-5.0 [3] https://www.youtube.com/watch?v=4rDZ0b0TOvc