Hi everyone,

Not sure if anyone is using Beam with the Flink Runner and AutoValue
builders. For me, it doesn't work. I have some questions and a workaround
for anyone in the same boat.

Beam 2.31, Flink 1.13, AutoValue 1.8.2

Here's the code:

package org.whatever.testing;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.util.Arrays;

public class App {

    public static void main(String[] args) {
        var options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

        var p = Pipeline.create(options);
        p

.apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build())))
                .apply(Convert.to(FooAutoValue.class))

.apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i
-> {
                    System.out.println(i.toString());
                    return i;
                }))
        ;
        p.run().waitUntilFinish();
    }
    @AutoValue
    @DefaultSchema(AutoValueSchema.class)
    public static abstract class FooAutoValue {
        public abstract String getDummyProp();

//        @SchemaCreate
//        public static FooAutoValue create(String dummyProp) {
//            return builder()
//                    .setDummyProp(dummyProp)
//                    .build();
//        }

        public static Builder builder() {
            return new AutoValue_App_FooAutoValue.Builder();
        }

        @AutoValue.Builder
        public abstract static class Builder {
            public abstract Builder setDummyProp(String newDummyProp);

            public abstract FooAutoValue build();
        }
    }
}

Note that it doesn't matter if FooAutoValue is an inner class or in its own
file as a top level non static class. For simplicity here I'm converting
the objects to the same class, in prod code the input is of another type
with equivalent schema.

And the stack trace:

Caused by: java.lang.IllegalAccessError: failed to access class
org.whatever.testing.AutoValue_App_FooAutoValue$Builder from class
org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1
(org.whatever.testing.AutoValue_App_FooAutoValue$Builder is in unnamed
module of loader 'app';
org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1 is in
unnamed module of loader org.apache.flink.util.ChildFirstClassLoader
@26f4afda)
at
org.whatever.testing.SchemaUserTypeCreator$SchemaCodeGen$2thLkIj1.create(Unknown
Source)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.fromRow(FromRowUsingCreator.java:96)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:66)
at
org.apache.beam.sdk.schemas.FromRowUsingCreator.apply(FromRowUsingCreator.java:45)
at org.apache.beam.sdk.schemas.SchemaCoder.decode(SchemaCoder.java:129)
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:101)
at
org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:95)
at
org.apache.beam.sdk.transforms.Create$Values$BytesReader.advanceImpl(Create.java:518)
at
org.apache.beam.sdk.transforms.Create$Values$BytesReader.startImpl(Create.java:500)
at
org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:252)
at
org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:80)
at
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:829)


Workaround:
- Uncomment the schemaCreate method
- compile the code with "-parameters", if using maven:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.8.1</version>
    <configuration>
        <compilerArgs>
            <arg>
                -parameters
            </arg>
        </compilerArgs>
....


My questions:
1. Why is Beam trying to get the generated AutoValue Builder through
reflection using AutoValueUtils.getAutoValueGeneratedName (builds
"AutoValue_App_FooAutoValue$Builder") instead of calling
FooAutoValue.builder() directly without reflection?
2. With flink, given the fancy classloader work Flink does [1], in the
AutoValueUtils.createBuilderCreator there is a call to
ReflectHelpers.findClassLoader() which gets the thread classLoader, in this
case it's a ChildFirstClassLoader. However the builder is registered as
package private in the app classloader. Is there a reason why Beam
registers the generated coder on the thread classLoader?

As per usual, if this is deemed a valid bug, please let me know and I can
eventually make a PR fixing it.

Thank you,
Cristian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/

Reply via email to