Sure, it helps a lot. After update to Flink 1.10.1, problem solved! Thanks, Bin
Fabian Hueske <fhue...@gmail.com> 于2020年6月17日周三 下午4:32写道: > The exception is thrown when the StreamGraph is translated to a JobGraph. > The translation logic has a switch. If checkpointing is enabled, the Java > code generated by the optimizer is directly compiled in the client (in > contrast to compiling it on the TaskManagers when the operators are > started). > The compiler needs access to the UDF classes but the classloader that's > being used doesn't know about the UDF classes. > > The classloader that's used for the compilation is generated by > PackagedProgram. > You can configure the PackagedProgram with the right user code JAR URLs > which contain the UDF classes. > Alternatively, you can try to inject another classloader into > PackagedProgram using Reflection (but that's a rather hacky approach). > > Hope this helps. > > Cheers, Fabian > > Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu <imj...@gmail.com>: > > > Why compile JobGraph yourself? This is really an internal API and may > cause > > problems. > > Could you try to use `flink run` command [1] to submit your user jar > > instead? > > > > Btw, what's your Flink version? If you are using Flink 1.10.0, could you > > try to use 1.10.1? > > > > Best, > > Jark > > > > On Wed, 17 Jun 2020 at 12:41, 杜斌 <dubin...@gmail.com> wrote: > > > > > Thanks for the reply, > > > Here is the simple java program that re-produce the problem: > > > 1. code for the application: > > > > > > import org.apache.flink.api.java.tuple.Tuple2; > > > import org.apache.flink.streaming.api.datastream.DataStream; > > > import > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > > > import org.apache.flink.table.api.EnvironmentSettings; > > > import org.apache.flink.table.api.Table; > > > import org.apache.flink.table.api.java.StreamTableEnvironment; > > > import org.apache.flink.types.Row; > > > > > > > > > import java.util.Arrays; > > > > > > public class Test { > > > public static void main(String[] args) throws Exception { > > > > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > /** > > > * If enable checkpoint, blink planner will failed > > > */ > > > env.enableCheckpointing(1000); > > > > > > EnvironmentSettings envSettings = > > EnvironmentSettings.newInstance() > > > // .useBlinkPlanner() // compile fail > > > .useOldPlanner() // compile success > > > .inStreamingMode() > > > .build(); > > > StreamTableEnvironment tEnv = > > > StreamTableEnvironment.create(env, envSettings); > > > DataStream<Order> orderA = env.fromCollection(Arrays.asList( > > > new Order(1L, "beer", 3), > > > new Order(1L, "diaper", 4), > > > new Order(3L, "beer", 2) > > > )); > > > > > > // Table table = tEnv.fromDataStream(orderA); > > > tEnv.createTemporaryView("orderA", orderA); > > > Table res = tEnv.sqlQuery("SELECT * FROM orderA"); > > > DataStream<Tuple2<Boolean, Row>> ds = > > > tEnv.toRetractStream(res, Row.class); > > > ds.print(); > > > env.execute(); > > > > > > } > > > > > > public static class Order { > > > public long user; > > > public String product; > > > public int amount; > > > > > > public Order(long user, String product, int amount) { > > > this.user = user; > > > this.product = product; > > > this.amount = amount; > > > } > > > > > > public long getUser() { > > > return user; > > > } > > > > > > public void setUser(long user) { > > > this.user = user; > > > } > > > > > > public String getProduct() { > > > return product; > > > } > > > > > > public void setProduct(String product) { > > > this.product = product; > > > } > > > > > > public int getAmount() { > > > return amount; > > > } > > > > > > public void setAmount(int amount) { > > > this.amount = amount; > > > } > > > } > > > } > > > > > > 2. mvn clean package to a jar file > > > 3. then we use the following code to produce a jobgraph: > > > > > > PackagedProgram program = > > > PackagedProgram.newBuilder() > > > .setJarFile(userJarPath) > > > .setUserClassPaths(classpaths) > > > .setEntryPointClassName(userMainClass) > > > .setConfiguration(configuration) > > > > > > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() && > > > descriptor.getSavepointPath() != null && > > > !descriptor.getSavepointPath().equals("")) ? > > > > > > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(), > > > descriptor.isAllowNonRestoredState()) : > > > SavepointRestoreSettings.none()) > > > .setArguments(userArgs) > > > .build(); > > > > > > > > > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, > > > configuration, 4, true); > > > > > > 4. If we use blink planner & enable checkpoint, the compile will > failed. > > > For the others, the compile success. > > > > > > Thanks, > > > Bin > > > > > > Jark Wu <imj...@gmail.com> 于2020年6月17日周三 上午10:42写道: > > > > > > > Hi, > > > > > > > > Which Flink version are you using? Are you using SQL CLI? Could you > > share > > > > your table/sql program? > > > > We did fix some classloading problems around SQL CLI, e.g. > FLINK-18302 > > > > > > > > Best, > > > > Jark > > > > > > > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <dubin...@gmail.com> wrote: > > > > > > > > > add the full stack trace here: > > > > > > > > > > > > > > > Caused by: > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > > > > > org.apache.flink.api.common.InvalidProgramException: Table program > > > cannot > > > > > be compiled. This is a bug. Please file an issue. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) > > > > > ... 14 more > > > > > Caused by: org.apache.flink.api.common.InvalidProgramException: > Table > > > > > program cannot be compiled. This is a bug. Please file an issue. > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > > > > > ... 17 more > > > > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2, > > > Column > > > > > 46: Cannot determine simple type name "org" > > > > > at > > > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > > > > at > > > > > > > > > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) > > > > > at > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) > > > > > at > > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) > > > > > at > > > > > > > > > > > > > > > > > > > > org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394) > > > > > at > > > > > > > > > > > > > > > > > > > > org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389) > > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) > > > > > at > > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) > > > > > at > > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) > > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) > > > > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) > > > > > at > > org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215) > > > > > at > > > > > > > > > > > > > > > org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886) > > > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455) > > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260) > > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237) > > > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492) > > > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > > > > > at > org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > > > > > at > > > > > > > > > > > > > > > > > > > > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > > > > > at > > > > > > > > > > > > > > > > > > > > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > > > > > at > > > > > > > > > > > > > > > > > > > > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > > > > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > > > > > at > > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > > > > > at > > > > > > > > > > > > > > > > > > > > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > > > > > at > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > > > > > ... 23 more > > > > > > > > > > 杜斌 <dubin...@gmail.com> 于2020年6月17日周三 上午10:29写道: > > > > > > > > > > > Hi, > > > > > > Need help on this issue, here is what Flink reported when I > enable > > > the > > > > > > checkpoint setting of the StreamExecutionEnvironment: > > > > > > > > > > > > /* 1 */ > > > > > > /* 2 */ public class SourceConversion$1 extends > > > > > > > > > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator > > > > > > /* 3 */ implements > > > > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > > > > > > /* 4 */ > > > > > > /* 5 */ private final Object[] references; > > > > > > /* 6 */ private transient > > > > > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter > > > > > > converter$0; > > > > > > /* 7 */ private final > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > > > outElement = > > > > > > new > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > > > > > > /* 8 */ > > > > > > /* 9 */ public SourceConversion$1( > > > > > > /* 10 */ Object[] references, > > > > > > /* 11 */ > > > org.apache.flink.streaming.runtime.tasks.StreamTask > > > > > > task, > > > > > > /* 12 */ > > org.apache.flink.streaming.api.graph.StreamConfig > > > > > > config, > > > > > > /* 13 */ > org.apache.flink.streaming.api.operators.Output > > > > > > output) throws Exception { > > > > > > /* 14 */ this.references = references; > > > > > > /* 15 */ converter$0 = > > > > > > > > > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter) > > > > > > references[0])); > > > > > > /* 16 */ this.setup(task, config, output); > > > > > > /* 17 */ } > > > > > > /* 18 */ > > > > > > /* 19 */ @Override > > > > > > /* 20 */ public void open() throws Exception { > > > > > > /* 21 */ super.open(); > > > > > > /* 22 */ > > > > > > /* 23 */ } > > > > > > /* 24 */ > > > > > > /* 25 */ @Override > > > > > > /* 26 */ public void > > > > > > > > > > > > > > > > > > > > > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > > > > > > element) throws Exception { > > > > > > /* 27 */ org.apache.flink.table.dataformat.BaseRow in1 = > > > > > > (org.apache.flink.table.dataformat.BaseRow) > > > > > > (org.apache.flink.table.dataformat.BaseRow) > > > > > > converter$0.toInternal((org.apache.flink.types.Row) > > > > element.getValue()); > > > > > > /* 28 */ > > > > > > /* 29 */ > > > > > > /* 30 */ > > > > > > /* 31 */ output.collect(outElement.replace(in1)); > > > > > > /* 32 */ } > > > > > > /* 33 */ > > > > > > /* 34 */ > > > > > > /* 35 */ > > > > > > /* 36 */ @Override > > > > > > /* 37 */ public void close() throws Exception { > > > > > > /* 38 */ super.close(); > > > > > > /* 39 */ > > > > > > /* 40 */ } > > > > > > /* 41 */ > > > > > > /* 42 */ > > > > > > /* 43 */ } > > > > > > /* 44 */ > > > > > > > > > > > > Exception in thread "main" > > > org.apache.flink.util.FlinkRuntimeException: > > > > > > org.apache.flink.api.common.InvalidProgramException: Table > program > > > > cannot > > > > > > be compiled. This is a bug. Please file an issue. > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85) > > > > > > > > > > > > The janino will compile successfully when I remove the checkpoint > > > > setting > > > > > > of the env. > > > > > > > > > > > > Can anyone help on this? > > > > > > > > > > > > Thanks, > > > > > > Bin > > > > > > > > > > > > > > > > > > > > >