Noticing that you had another question on this thread. If I understand correctly, the answer to your question is that Beam converting objects to/from Row uses bytecode generation for performance, and automatically figures out how to map the columns of the Row to builder methods. But I may be misunderstanding which part of SchemaCoder is involved here. Mostly just bumping this thread.
Kenn On Tue, Oct 26, 2021 at 11:29 AM Cristian Constantinescu <[email protected]> wrote: > Hi Reuven, > > Thanks for the quick reply. > > Could you elaborate why Beam needs to create the Builder dynamically > through reflection (basically using reflection to create an instance of a > package private class)? As far as AutoValue goes, it looks like an > anti-pattern to try to create an instance of the generated builder by > calling the AutoValue generated class (AutoValue_App_FooAutoValue in this > case). I think that normally, the only place that can call the auto > generated builder constructor is from the user code abstract class > (FooAutoValue) through: > > public static Builder builder() { > return new AutoValue_App_FooAutoValue.Builder(); > } > > In fact, this method is directly called when using the @SchemaCreate > method, regardless if the create method is called through reflection or > not. I guess what I'm asking is, could beam not call the > FooAutoValue.builder() dynamically if directly is not possible? > > > On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax <[email protected]> wrote: > >> Beam needs to create these elements dynamically. when decoding records, >> so it can't easily call the builder directly. >> >> My first guess is that there's a classloader issue here. Flink does some >> fancy classloader munging, and that might be breaking an assumption in this >> code. Passing in the correct classloader should hopefully fix this. >> >> Reuven >> >> >> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu < >> [email protected]> wrote: >> >>> Hi everyone, >>> >>> Not sure if anyone is using Beam with the Flink Runner and AutoValue >>> builders. For me, it doesn't work. I have some questions and a workaround >>> for anyone in the same boat. >>> >>> Beam 2.31, Flink 1.13, AutoValue 1.8.2 >>> >>> Here's the code: >>> >>> package org.whatever.testing; >>> >>> import com.google.auto.value.AutoValue; >>> import org.apache.beam.sdk.Pipeline; >>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>> import org.apache.beam.sdk.schemas.AutoValueSchema; >>> import org.apache.beam.sdk.schemas.annotations.DefaultSchema; >>> import org.apache.beam.sdk.schemas.annotations.SchemaCreate; >>> import org.apache.beam.sdk.schemas.transforms.Convert; >>> import org.apache.beam.sdk.transforms.Create; >>> import org.apache.beam.sdk.transforms.MapElements; >>> import org.apache.beam.sdk.values.TypeDescriptor; >>> >>> import java.util.Arrays; >>> >>> public class App { >>> >>> public static void main(String[] args) { >>> var options = >>> PipelineOptionsFactory.fromArgs(args).withValidation().create(); >>> >>> var p = Pipeline.create(options); >>> p >>> >>> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build()))) >>> .apply(Convert.to(FooAutoValue.class)) >>> >>> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> { >>> System.out.println(i.toString()); >>> return i; >>> })) >>> ; >>> p.run().waitUntilFinish(); >>> } >>> @AutoValue >>> @DefaultSchema(AutoValueSchema.class) >>> public static abstract class FooAutoValue { >>> public abstract String getDummyProp(); >>> >>> // @SchemaCreate >>> // public static FooAutoValue create(String dummyProp) { >>> // return builder() >>> // .setDummyProp(dummyProp) >>> // .build(); >>> // } >>> >>> public static Builder builder() { >>> return new AutoValue_App_FooAutoValue.Builder(); >>> } >>> >>> @AutoValue.Builder >>> public abstract static class Builder { >>> public abstract Builder setDummyProp(String newDummyProp); >>> >>> public abstract FooAutoValue build(); >>> } >>> } >>> } >>> >>> Note that it doesn't matter if FooAutoValue is an inner class or in its >>> own file as a top level non static class. For simplicity here I'm >>> converting the objects to the same class, in prod code the input is of >>> another type with equivalent schema. >>> >>> And the stack trace: >>> >>> Caused by: java.lang.IllegalAccessError: failed to access class >>> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class >>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 >>> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed >>> module of loader 'app'; >>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in >>> unnamed module of loader org.apache.flink.util.ChildFirstClassLoader >>> @26f4afda) >>> at >>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown >>> Source) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66) >>> at >>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45) >>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129) >>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) >>> at >>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118) >>> at >>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101) >>> at >>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95) >>> at >>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518) >>> at >>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500) >>> at >>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252) >>> at >>> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80) >>> at >>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42) >>> at >>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >>> at java.base/java.lang.Thread.run(Thread.java:829) >>> >>> >>> Workaround: >>> - Uncomment the schemaCreate method >>> - compile the code with "-parameters", if using maven: >>> >>> <plugin> >>> <groupId>org.apache.maven.plugins</groupId> >>> <artifactId>maven-compiler-plugin</artifactId> >>> <version>3.8.1</version> >>> <configuration> >>> <compilerArgs> >>> <arg> >>> -parameters >>> </arg> >>> </compilerArgs> >>> .... >>> >>> >>> My questions: >>> 1. Why is Beam trying to get the generated AutoValue Builder through >>> reflection using AutoValueUtils.getAutoValueGeneratedName (builds >>> "AutoValue_App_FooAutoValue$Builder") instead of calling >>> FooAutoValue.builder() directly without reflection? >>> 2. With flink, given the fancy classloader work Flink does [1], in the >>> AutoValueUtils.createBuilderCreator there is a call to >>> ReflectHelpers.findClassLoader() which gets the thread classLoader, in this >>> case it's a ChildFirstClassLoader. However the builder is registered as >>> package private in the app classloader. Is there a reason why Beam >>> registers the generated coder on the thread classLoader? >>> >>> As per usual, if this is deemed a valid bug, please let me know and I >>> can eventually make a PR fixing it. >>> >>> Thank you, >>> Cristian >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/ >>> >>
