Hi everyone, Not sure if anyone is using Beam with the Flink Runner and AutoValue builders. For me, it doesn't work. I have some questions and a workaround for anyone in the same boat.
Beam 2.31, Flink 1.13, AutoValue 1.8.2 Here's the code: package org.whatever.testing; import com.google.auto.value.AutoValue; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.schemas.transforms.Convert; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; import java.util.Arrays; public class App { public static void main(String[] args) { var options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); var p = Pipeline.create(options); p .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build()))) .apply(Convert.to(FooAutoValue.class)) .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> { System.out.println(i.toString()); return i; })) ; p.run().waitUntilFinish(); } @AutoValue @DefaultSchema(AutoValueSchema.class) public static abstract class FooAutoValue { public abstract String getDummyProp(); // @SchemaCreate // public static FooAutoValue create(String dummyProp) { // return builder() // .setDummyProp(dummyProp) // .build(); // } public static Builder builder() { return new AutoValue_App_FooAutoValue.Builder(); } @AutoValue.Builder public abstract static class Builder { public abstract Builder setDummyProp(String newDummyProp); public abstract FooAutoValue build(); } } } Note that it doesn't matter if FooAutoValue is an inner class or in its own file as a top level non static class. For simplicity here I'm converting the objects to the same class, in prod code the input is of another type with equivalent schema. And the stack trace: Caused by: java.lang.IllegalAccessError: failed to access class org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed module of loader 'app'; org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @26f4afda) at org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown Source) at org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96) at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66) at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95) at org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518) at org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500) at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252) at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51) at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80) at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:829) Workaround: - Uncomment the schemaCreate method - compile the code with "-parameters", if using maven: <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <compilerArgs> <arg> -parameters </arg> </compilerArgs> .... My questions: 1. Why is Beam trying to get the generated AutoValue Builder through reflection using AutoValueUtils.getAutoValueGeneratedName (builds "AutoValue_App_FooAutoValue$Builder") instead of calling FooAutoValue.builder() directly without reflection? 2. With flink, given the fancy classloader work Flink does [1], in the AutoValueUtils.createBuilderCreator there is a call to ReflectHelpers.findClassLoader() which gets the thread classLoader, in this case it's a ChildFirstClassLoader. However the builder is registered as package private in the app classloader. Is there a reason why Beam registers the generated coder on the thread classLoader? As per usual, if this is deemed a valid bug, please let me know and I can eventually make a PR fixing it. Thank you, Cristian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/