[ 
https://issues.apache.org/jira/browse/FLINK-16662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17088047#comment-17088047
 ] 

Kostas Kloudas commented on FLINK-16662:
----------------------------------------

Hi [~zhanglibing1...@126.com], and thanks for working on this. From the 
discussion, I was left with the impression that we are going to either add a 
test in the PRs for this issue, or, at least, create a new JIRA that will 
target adding a test for these cases.

The PR has no test and there is also no other JIRA for this. Could you please 
create one so that we do not forget about it?

> Blink Planner failed to generate JobGraph for POJO DataStream converting to 
> Table (Cannot determine simple type name)
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-16662
>                 URL: https://issues.apache.org/jira/browse/FLINK-16662
>             Project: Flink
>          Issue Type: Bug
>          Components: Client / Job Submission, Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: chenxyz
>            Assignee: LionelZ
>            Priority: Blocker
>             Fix For: 1.10.1, 1.11.0
>
>
> When using Blink Palnner to convert a POJO DataStream to a Table, Blink will 
> generate and compile the SourceConversion$1 code. If the Jar task is 
> submitted to Flink, since the UserCodeClassLoader is not used when generating 
> the JobGraph, the ClassLoader(AppClassLoader) of the compiled code cannot 
> load the POJO class in the Jar package, so the following error will be 
> reported:
>  
> {code:java}
> Caused by: org.codehaus.commons.compiler.CompileException: Line 27, Column 
> 174: Cannot determine simple type name "net"Caused by: 
> org.codehaus.commons.compiler.CompileException: Line 27, Column 174: Cannot 
> determine simple type name "net" at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486) at 
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
>  at 
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
>  at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917) at 
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389) at 
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382) at 
> org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at 
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7009) at 
> org.codehaus.janino.UnitCompiler.access$15200(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6425) at 
> org.codehaus.janino.UnitCompiler$21$2.visitCast(UnitCompiler.java:6403) at 
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6403) at 
> org.codehaus.janino.UnitCompiler$21.visitRvalue(UnitCompiler.java:6382) at 
> org.codehaus.janino.Java$Rvalue.accept(Java.java:4105) at 
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382) at 
> org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(UnitCompiler.java:9150)
>  at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9036) at 
> org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8938) at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at 
> org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>  at 
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>  at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at 
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at 
> org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at 
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019) at 
> org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416) at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394) at 
> org.codehaus.janino.Java$Cast.accept(Java.java:4887) at 
> org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at 
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580) at 
> org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1503)
>  at 
> org.codehaus.janino.UnitCompiler$6.visitLocalVariableDeclarationStatement(UnitCompiler.java:1487)
>  at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3511)
>  at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at 
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at 
> org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>  at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at 
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at 
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>  at 
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>  at 
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) 
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at 
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>  at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at 
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at 
> org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
>  ... 20 more
> // generate class
> /* 1 */
> /* 2 */      public class SourceConversion$1 extends 
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> /* 3 */          implements 
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */        private final Object[] references;
> /* 6 */        private transient 
> org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter 
> converter$0;
> /* 7 */        private final 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */        public SourceConversion$1(
> /* 10 */            Object[] references,
> /* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,
> /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig config,
> /* 13 */            org.apache.flink.streaming.api.operators.Output output) 
> throws Exception {
> /* 14 */          this.references = references;
> /* 15 */          converter$0 = 
> (((org.apache.flink.table.dataformat.DataFormatConverters.PojoConverter) 
> references[0]));
> /* 16 */          this.setup(task, config, output);
> /* 17 */        }
> /* 18 */
> /* 19 */        @Override
> /* 20 */        public void open() throws Exception {
> /* 21 */          super.open();
> /* 22 */          
> /* 23 */        }
> /* 24 */
> /* 25 */        @Override
> /* 26 */        public void 
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
> element) throws Exception {
> /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 = 
> (org.apache.flink.table.dataformat.BaseRow) 
> (org.apache.flink.table.dataformat.BaseRow) 
> converter$0.toInternal((net.xxxxxxxxxx.Student) element.getValue());
> /* 28 */          
> /* 29 */          
> /* 30 */          
> /* 31 */          output.collect(outElement.replace(in1));
> /* 32 */        }
> /* 33 */
> /* 34 */        
> /* 35 */
> /* 36 */        @Override
> /* 37 */        public void close() throws Exception {
> /* 38 */           super.close();
> /* 39 */          
> /* 40 */        }
> /* 41 */
> /* 42 */        
> /* 43 */      }
> /* 44 */    {code}
> I think like generating Pipeline (StreamGraph), UserCodeClassLoader should be 
> used when generating JobGraph.
> The test code is as follows:
>  
> {code:java}
> public class App {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         EnvironmentSettings envSet = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
> envSet);
>         env.enableCheckpointing(2 * 60 * 1000);
>         TableConfig config = tableEnv.getConfig();
>         config.setIdleStateRetentionTime(Time.hours(24),
>                 Time.hours(25));
>         DataStreamSource<Student> source = env.addSource(new 
> SourceFunction<Student>() {
>             @Override
>             public void run(SourceContext<Student> ctx) throws Exception {
>                 ctx.collect(new Student(1, "Tom"));
>             }
>             @Override
>             public void cancel() {
>             }
>         });
>         tableEnv.createTemporaryView("student", source, "id, name");
>         Table table = tableEnv.sqlQuery("select id, name from student");
>         CsvTableSink sink = new CsvTableSink("/data/student", ",", 10, 
> FileSystem.WriteMode.OVERWRITE);
>         String[] fieldNames = {"id", "name"};
>         TypeInformation[] fieldTypes = {Types.INT, Types.STRING};
>         tableEnv.registerTableSink("student_sink", fieldNames, fieldTypes, 
> sink);
>         table.insertInto("student_sink");
>         env.execute("Test_Jar");
>     }
>     @Getter
>     @Setter
>     @NoArgsConstructor
>     @AllArgsConstructor
>     public static class Student {
>         private Integer id;
>         private String name;
>     }
> }{code}
> To reproduce this bug, the following conditions must be met:
> 1. Convert POJO DataStream to Table
> 2. Enables Checkpoint, StreamingJobGraphGenerator#preValidate() will check 
> whether Checkpoint is enabled
> 3. The program is packaged into a Jar and submitted to Flink, or invoke 
> PackagedProgramUtils.createJobGraph to create JobGraph by the Jar Program 
> directly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to