Bob Lau created FLINK-9259: ------------------------------ Summary: The implementation of the SourceFunction is not serializable. Key: FLINK-9259 URL: https://issues.apache.org/jira/browse/FLINK-9259 Project: Flink Issue Type: Bug Components: DataStream API, Table API & SQL Affects Versions: 1.5.0 Reporter: Bob Lau
The exception stack is as follows: {code:java} //代码占位符 org.apache.flink.api.common.InvalidProgramException: The implementation of the SourceFunction is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416) at com.xxxx.tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40) at org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) at org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) at org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) at com.xxxx.tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338) at com.xxxx.tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 21 more {code} I've implement the serializable interface in the implementation of the SourceFunction. The code is as follows: {code:java} //代码占位符 @Override public void run(SourceContext<Row> ctx) throws Exception { stream.map(new MapFunction<Row, Row>(){ /** */ private static final long serialVersionUID = -1723722950731109198L; @Override public Row map(Row input) throws Exception { ctx.collect(input); return null; } }); } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)