Sure, it helps a lot. After update to Flink 1.10.1, problem solved!

Thanks,
Bin

Fabian Hueske <fhue...@gmail.com> 于2020年6月17日周三 下午4:32写道:

> The exception is thrown when the StreamGraph is translated to a JobGraph.
> The translation logic has a switch. If checkpointing is enabled, the Java
> code generated by the optimizer is directly compiled in the client (in
> contrast to compiling it on the TaskManagers when the operators are
> started).
> The compiler needs access to the UDF classes but the classloader that's
> being used doesn't know about the UDF classes.
>
> The classloader that's used for the compilation is generated by
> PackagedProgram.
> You can configure the PackagedProgram with the right user code JAR URLs
> which contain the UDF classes.
> Alternatively, you can try to inject another classloader into
> PackagedProgram using Reflection (but that's a rather hacky approach).
>
> Hope this helps.
>
> Cheers, Fabian
>
> Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu <imj...@gmail.com>:
>
> > Why compile JobGraph yourself? This is really an internal API and may
> cause
> > problems.
> > Could you try to use `flink run` command [1] to submit your user jar
> > instead?
> >
> > Btw, what's your Flink version? If you are using Flink 1.10.0, could you
> > try to use 1.10.1?
> >
> > Best,
> > Jark
> >
> > On Wed, 17 Jun 2020 at 12:41, 杜斌 <dubin...@gmail.com> wrote:
> >
> > > Thanks for the reply,
> > > Here is the simple java program that re-produce the problem:
> > > 1. code for the application:
> > >
> > > import org.apache.flink.api.java.tuple.Tuple2;
> > > import org.apache.flink.streaming.api.datastream.DataStream;
> > > import
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > > import org.apache.flink.table.api.EnvironmentSettings;
> > > import org.apache.flink.table.api.Table;
> > > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > > import org.apache.flink.types.Row;
> > >
> > >
> > > import java.util.Arrays;
> > >
> > > public class Test {
> > >     public static void main(String[] args) throws Exception {
> > >
> > >         StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         /**
> > >          * If enable checkpoint, blink planner will failed
> > >          */
> > >         env.enableCheckpointing(1000);
> > >
> > >         EnvironmentSettings envSettings =
> > EnvironmentSettings.newInstance()
> > > //                .useBlinkPlanner() // compile fail
> > >                 .useOldPlanner() // compile success
> > >                 .inStreamingMode()
> > >                 .build();
> > >         StreamTableEnvironment tEnv =
> > > StreamTableEnvironment.create(env, envSettings);
> > >         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
> > >                 new Order(1L, "beer", 3),
> > >                 new Order(1L, "diaper", 4),
> > >                 new Order(3L, "beer", 2)
> > >         ));
> > >
> > > //        Table table = tEnv.fromDataStream(orderA);
> > >         tEnv.createTemporaryView("orderA", orderA);
> > >         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> > >         DataStream<Tuple2<Boolean, Row>> ds =
> > > tEnv.toRetractStream(res, Row.class);
> > >         ds.print();
> > >         env.execute();
> > >
> > >     }
> > >
> > >     public static class Order {
> > >         public long user;
> > >         public String product;
> > >         public int amount;
> > >
> > >         public Order(long user, String product, int amount) {
> > >             this.user = user;
> > >             this.product = product;
> > >             this.amount = amount;
> > >         }
> > >
> > >         public long getUser() {
> > >             return user;
> > >         }
> > >
> > >         public void setUser(long user) {
> > >             this.user = user;
> > >         }
> > >
> > >         public String getProduct() {
> > >             return product;
> > >         }
> > >
> > >         public void setProduct(String product) {
> > >             this.product = product;
> > >         }
> > >
> > >         public int getAmount() {
> > >             return amount;
> > >         }
> > >
> > >         public void setAmount(int amount) {
> > >             this.amount = amount;
> > >         }
> > >     }
> > > }
> > >
> > > 2. mvn clean package to a jar file
> > > 3. then we use the following code to produce a jobgraph:
> > >
> > > PackagedProgram program =
> > >         PackagedProgram.newBuilder()
> > >                 .setJarFile(userJarPath)
> > >                 .setUserClassPaths(classpaths)
> > >                 .setEntryPointClassName(userMainClass)
> > >                 .setConfiguration(configuration)
> > >
> > > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> > > descriptor.getSavepointPath() != null &&
> > > !descriptor.getSavepointPath().equals("")) ?
> > >
> > > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> > > descriptor.isAllowNonRestoredState()) :
> > > SavepointRestoreSettings.none())
> > >                 .setArguments(userArgs)
> > >                 .build();
> > >
> > >
> > > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> > > configuration, 4, true);
> > >
> > > 4. If we use blink planner & enable checkpoint, the compile will
> failed.
> > > For the others, the compile success.
> > >
> > > Thanks,
> > > Bin
> > >
> > > Jark Wu <imj...@gmail.com> 于2020年6月17日周三 上午10:42写道:
> > >
> > > > Hi,
> > > >
> > > > Which Flink version are you using? Are you using SQL CLI? Could you
> > share
> > > > your table/sql program?
> > > > We did fix some classloading problems around SQL CLI, e.g.
> FLINK-18302
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <dubin...@gmail.com> wrote:
> > > >
> > > > > add the full stack trace here:
> > > > >
> > > > >
> > > > > Caused by:
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > > cannot
> > > > > be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > > > ... 14 more
> > > > > Caused by: org.apache.flink.api.common.InvalidProgramException:
> Table
> > > > > program cannot be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > > > ... 17 more
> > > > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> > > Column
> > > > > 46: Cannot determine simple type name "org"
> > > > > at
> > > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > > > at
> > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > > > at
> > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > > > at
> > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > > > at
> > org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > > > at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > > > at
> > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > > > ... 23 more
> > > > >
> > > > > 杜斌 <dubin...@gmail.com> 于2020年6月17日周三 上午10:29写道:
> > > > >
> > > > > > Hi,
> > > > > > Need help on this issue, here is what Flink reported when I
> enable
> > > the
> > > > > > checkpoint setting of the StreamExecutionEnvironment:
> > > > > >
> > > > > > /* 1 */
> > > > > > /* 2 */      public class SourceConversion$1 extends
> > > > > >
> > > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > > > /* 3 */          implements
> > > > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > > > /* 4 */
> > > > > > /* 5 */        private final Object[] references;
> > > > > > /* 6 */        private transient
> > > > > >
> org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > > > converter$0;
> > > > > > /* 7 */        private final
> > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > outElement =
> > > > > > new
> > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > > > /* 8 */
> > > > > > /* 9 */        public SourceConversion$1(
> > > > > > /* 10 */            Object[] references,
> > > > > > /* 11 */
> > > org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > > > task,
> > > > > > /* 12 */
> > org.apache.flink.streaming.api.graph.StreamConfig
> > > > > > config,
> > > > > > /* 13 */
> org.apache.flink.streaming.api.operators.Output
> > > > > > output) throws Exception {
> > > > > > /* 14 */          this.references = references;
> > > > > > /* 15 */          converter$0 =
> > > > > >
> > > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > > > references[0]));
> > > > > > /* 16 */          this.setup(task, config, output);
> > > > > > /* 17 */        }
> > > > > > /* 18 */
> > > > > > /* 19 */        @Override
> > > > > > /* 20 */        public void open() throws Exception {
> > > > > > /* 21 */          super.open();
> > > > > > /* 22 */
> > > > > > /* 23 */        }
> > > > > > /* 24 */
> > > > > > /* 25 */        @Override
> > > > > > /* 26 */        public void
> > > > > >
> > > > >
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > > element) throws Exception {
> > > > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > > converter$0.toInternal((org.apache.flink.types.Row)
> > > > element.getValue());
> > > > > > /* 28 */
> > > > > > /* 29 */
> > > > > > /* 30 */
> > > > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > > > /* 32 */        }
> > > > > > /* 33 */
> > > > > > /* 34 */
> > > > > > /* 35 */
> > > > > > /* 36 */        @Override
> > > > > > /* 37 */        public void close() throws Exception {
> > > > > > /* 38 */           super.close();
> > > > > > /* 39 */
> > > > > > /* 40 */        }
> > > > > > /* 41 */
> > > > > > /* 42 */
> > > > > > /* 43 */      }
> > > > > > /* 44 */
> > > > > >
> > > > > > Exception in thread "main"
> > > org.apache.flink.util.FlinkRuntimeException:
> > > > > > org.apache.flink.api.common.InvalidProgramException: Table
> program
> > > > cannot
> > > > > > be compiled. This is a bug. Please file an issue.
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > > > >
> > > > > > The janino will compile successfully when I remove the checkpoint
> > > > setting
> > > > > > of the env.
> > > > > >
> > > > > > Can anyone help on this?
> > > > > >
> > > > > > Thanks,
> > > > > > Bin
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to