Hi Reuven,

Thanks for the quick reply.

Could you elaborate why Beam needs to create the Builder dynamically
through reflection (basically using reflection to create an instance of a
package private class)? As far as AutoValue goes, it looks like an
anti-pattern to try to create an instance of the generated builder by
calling the AutoValue generated class (AutoValue_App_FooAutoValue in this
case). I think that normally, the only place that can call the auto
generated builder constructor is from the user code abstract class
(FooAutoValue) through:

public static Builder builder() {
            return new AutoValue_App_FooAutoValue.Builder();
        }

In fact, this method is directly called when using the @SchemaCreate
method, regardless if the create method is called through reflection or
not. I guess what I'm asking is, could beam not call the
FooAutoValue.builder() dynamically if directly is not possible?


On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax <re...@google.com> wrote:

> Beam needs to create these elements dynamically. when decoding records, so
> it can't easily call the builder directly.
>
> My first guess is that there's a classloader issue here. Flink does some
> fancy classloader munging, and that might be breaking an assumption in this
> code. Passing in the correct classloader should hopefully fix this.
>
> Reuven
>
>
> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu <zei...@gmail.com>
> wrote:
>
>> Hi everyone,
>>
>> Not sure if anyone is using Beam with the Flink Runner and AutoValue
>> builders. For me, it doesn't work. I have some questions and a workaround
>> for anyone in the same boat.
>>
>> Beam 2.31, Flink 1.13, AutoValue 1.8.2
>>
>> Here's the code:
>>
>> package org.whatever.testing;
>>
>> import com.google.auto.value.AutoValue;
>> import org.apache.beam.sdk.Pipeline;
>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>> import org.apache.beam.sdk.schemas.AutoValueSchema;
>> import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
>> import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
>> import org.apache.beam.sdk.schemas.transforms.Convert;
>> import org.apache.beam.sdk.transforms.Create;
>> import org.apache.beam.sdk.transforms.MapElements;
>> import org.apache.beam.sdk.values.TypeDescriptor;
>>
>> import java.util.Arrays;
>>
>> public class App {
>>
>>     public static void main(String[] args) {
>>         var options = 
>> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>>
>>         var p = Pipeline.create(options);
>>         p
>>                 
>> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build())))
>>                 .apply(Convert.to(FooAutoValue.class))
>>                 
>> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> {
>>                     System.out.println(i.toString());
>>                     return i;
>>                 }))
>>         ;
>>         p.run().waitUntilFinish();
>>     }
>>     @AutoValue
>>     @DefaultSchema(AutoValueSchema.class)
>>     public static abstract class FooAutoValue {
>>         public abstract String getDummyProp();
>>
>> //        @SchemaCreate
>> //        public static FooAutoValue create(String dummyProp) {
>> //            return builder()
>> //                    .setDummyProp(dummyProp)
>> //                    .build();
>> //        }
>>
>>         public static Builder builder() {
>>             return new AutoValue_App_FooAutoValue.Builder();
>>         }
>>
>>         @AutoValue.Builder
>>         public abstract static class Builder {
>>             public abstract Builder setDummyProp(String newDummyProp);
>>
>>             public abstract FooAutoValue build();
>>         }
>>     }
>> }
>>
>> Note that it doesn't matter if FooAutoValue is an inner class or in its
>> own file as a top level non static class. For simplicity here I'm
>> converting the objects to the same class, in prod code the input is of
>> another type with equivalent schema.
>>
>> And the stack trace:
>>
>> Caused by: java.lang.IllegalAccessError: failed to access class
>> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
>> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
>> module of loader 'app';
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in
>> unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
>> @26f4afda)
>> at
>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown
>> Source)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66)
>> at
>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
>> at
>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95)
>> at
>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518)
>> at
>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500)
>> at
>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
>> at
>> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80)
>> at
>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> at java.base/java.lang.Thread.run(Thread.java:829)
>>
>>
>> Workaround:
>> - Uncomment the schemaCreate method
>> - compile the code with "-parameters", if using maven:
>>
>> <plugin>
>>     <groupId>org.apache.maven.plugins</groupId>
>>     <artifactId>maven-compiler-plugin</artifactId>
>>     <version>3.8.1</version>
>>     <configuration>
>>         <compilerArgs>
>>             <arg>
>>                 -parameters
>>             </arg>
>>         </compilerArgs>
>> ....
>>
>>
>> My questions:
>> 1. Why is Beam trying to get the generated AutoValue Builder through
>> reflection using AutoValueUtils.getAutoValueGeneratedName (builds
>> "AutoValue_App_FooAutoValue$Builder") instead of calling
>> FooAutoValue.builder() directly without reflection?
>> 2. With flink, given the fancy classloader work Flink does [1], in the
>> AutoValueUtils.createBuilderCreator there is a call to
>> ReflectHelpers.findClassLoader() which gets the thread classLoader, in this
>> case it's a ChildFirstClassLoader. However the builder is registered as
>> package private in the app classloader. Is there a reason why Beam
>> registers the generated coder on the thread classLoader?
>>
>> As per usual, if this is deemed a valid bug, please let me know and I can
>> eventually make a PR fixing it.
>>
>> Thank you,
>> Cristian
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/
>>
>

Reply via email to