Noticing that you had another question on this thread. If I understand
correctly, the answer to your question is that Beam converting objects
to/from Row uses bytecode generation for performance, and automatically
figures out how to map the columns of the Row to builder methods. But I may
be misunderstanding which part of SchemaCoder is involved here. Mostly just
bumping this thread.

Kenn

On Tue, Oct 26, 2021 at 11:29 AM Cristian Constantinescu <[email protected]>
wrote:

> Hi Reuven,
>
> Thanks for the quick reply.
>
> Could you elaborate why Beam needs to create the Builder dynamically
> through reflection (basically using reflection to create an instance of a
> package private class)? As far as AutoValue goes, it looks like an
> anti-pattern to try to create an instance of the generated builder by
> calling the AutoValue generated class (AutoValue_App_FooAutoValue in this
> case). I think that normally, the only place that can call the auto
> generated builder constructor is from the user code abstract class
> (FooAutoValue) through:
>
> public static Builder builder() {
>             return new AutoValue_App_FooAutoValue.Builder();
>         }
>
> In fact, this method is directly called when using the @SchemaCreate
> method, regardless if the create method is called through reflection or
> not. I guess what I'm asking is, could beam not call the
> FooAutoValue.builder() dynamically if directly is not possible?
>
>
> On Tue, Oct 26, 2021 at 2:15 PM Reuven Lax <[email protected]> wrote:
>
>> Beam needs to create these elements dynamically. when decoding records,
>> so it can't easily call the builder directly.
>>
>> My first guess is that there's a classloader issue here. Flink does some
>> fancy classloader munging, and that might be breaking an assumption in this
>> code. Passing in the correct classloader should hopefully fix this.
>>
>> Reuven
>>
>>
>> On Tue, Oct 26, 2021 at 10:59 AM Cristian Constantinescu <
>> [email protected]> wrote:
>>
>>> Hi everyone,
>>>
>>> Not sure if anyone is using Beam with the Flink Runner and AutoValue
>>> builders. For me, it doesn't work. I have some questions and a workaround
>>> for anyone in the same boat.
>>>
>>> Beam 2.31, Flink 1.13, AutoValue 1.8.2
>>>
>>> Here's the code:
>>>
>>> package org.whatever.testing;
>>>
>>> import com.google.auto.value.AutoValue;
>>> import org.apache.beam.sdk.Pipeline;
>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>> import org.apache.beam.sdk.schemas.AutoValueSchema;
>>> import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
>>> import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
>>> import org.apache.beam.sdk.schemas.transforms.Convert;
>>> import org.apache.beam.sdk.transforms.Create;
>>> import org.apache.beam.sdk.transforms.MapElements;
>>> import org.apache.beam.sdk.values.TypeDescriptor;
>>>
>>> import java.util.Arrays;
>>>
>>> public class App {
>>>
>>>     public static void main(String[] args) {
>>>         var options = 
>>> PipelineOptionsFactory.fromArgs(args).withValidation().create();
>>>
>>>         var p = Pipeline.create(options);
>>>         p
>>>                 
>>> .apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build())))
>>>                 .apply(Convert.to(FooAutoValue.class))
>>>                 
>>> .apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i -> {
>>>                     System.out.println(i.toString());
>>>                     return i;
>>>                 }))
>>>         ;
>>>         p.run().waitUntilFinish();
>>>     }
>>>     @AutoValue
>>>     @DefaultSchema(AutoValueSchema.class)
>>>     public static abstract class FooAutoValue {
>>>         public abstract String getDummyProp();
>>>
>>> //        @SchemaCreate
>>> //        public static FooAutoValue create(String dummyProp) {
>>> //            return builder()
>>> //                    .setDummyProp(dummyProp)
>>> //                    .build();
>>> //        }
>>>
>>>         public static Builder builder() {
>>>             return new AutoValue_App_FooAutoValue.Builder();
>>>         }
>>>
>>>         @AutoValue.Builder
>>>         public abstract static class Builder {
>>>             public abstract Builder setDummyProp(String newDummyProp);
>>>
>>>             public abstract FooAutoValue build();
>>>         }
>>>     }
>>> }
>>>
>>> Note that it doesn't matter if FooAutoValue is an inner class or in its
>>> own file as a top level non static class. For simplicity here I'm
>>> converting the objects to the same class, in prod code the input is of
>>> another type with equivalent schema.
>>>
>>> And the stack trace:
>>>
>>> Caused by: java.lang.IllegalAccessError: failed to access class
>>> org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
>>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
>>> (org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
>>> module of loader 'app';
>>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in
>>> unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
>>> @26f4afda)
>>> at
>>> org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown
>>> Source)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66)
>>> at
>>> org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
>>> at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
>>> at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
>>> at
>>> org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95)
>>> at
>>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518)
>>> at
>>> org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500)
>>> at
>>> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
>>> at
>>> org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80)
>>> at
>>> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>> at java.base/java.lang.Thread.run(Thread.java:829)
>>>
>>>
>>> Workaround:
>>> - Uncomment the schemaCreate method
>>> - compile the code with "-parameters", if using maven:
>>>
>>> <plugin>
>>>     <groupId>org.apache.maven.plugins</groupId>
>>>     <artifactId>maven-compiler-plugin</artifactId>
>>>     <version>3.8.1</version>
>>>     <configuration>
>>>         <compilerArgs>
>>>             <arg>
>>>                 -parameters
>>>             </arg>
>>>         </compilerArgs>
>>> ....
>>>
>>>
>>> My questions:
>>> 1. Why is Beam trying to get the generated AutoValue Builder through
>>> reflection using AutoValueUtils.getAutoValueGeneratedName (builds
>>> "AutoValue_App_FooAutoValue$Builder") instead of calling
>>> FooAutoValue.builder() directly without reflection?
>>> 2. With flink, given the fancy classloader work Flink does [1], in the
>>> AutoValueUtils.createBuilderCreator there is a call to
>>> ReflectHelpers.findClassLoader() which gets the thread classLoader, in this
>>> case it's a ChildFirstClassLoader. However the builder is registered as
>>> package private in the app classloader. Is there a reason why Beam
>>> registers the generated coder on the thread classLoader?
>>>
>>> As per usual, if this is deemed a valid bug, please let me know and I
>>> can eventually make a PR fixing it.
>>>
>>> Thank you,
>>> Cristian
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/
>>>
>>

Reply via email to