[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.
[ https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453430#comment-16453430 ] Bob Lau commented on FLINK-9259: [~mingleizhang] I wont use parameter like inputStream ( generate last step ) ,and I wont trans it to new DataStrea. {code:java} //代码占位符 public class RowTableSourceFunction implements SourceFunction, ResultTypeQueryable { private final String[] fields; private final TypeInformation[] typeInfos; private final DataStream stream; public RowTableSourceFunction(DataStream stream, String[] fields, TypeInformation[] typeInfos){ this.fields = fields; this.typeInfos = typeInfos; this.stream = stream; } @Override public void run(SourceContext ctx) throws Exception { stream.flatMap(new FlatMapFunction(){ @Override public void flatMap(Row input, Collector out) throws Exception { ctx.collect(input); } } } }); {code} > The implementation of the SourceFunction is not serializable. > -- > > Key: FLINK-9259 > URL: https://issues.apache.org/jira/browse/FLINK-9259 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > The exception stack is as follows: > {code:java} > //Code placeholder > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416) > at > com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40) > at > org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.NotSerializableException: > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 21 more > {code} > I've implement the serializable interface in the implementation of the > SourceFunction. > The code is as follows: > > {code:java} > //Code placeholder > @Override > public void run(SourceContext ctx) > throws Exception { > stream.map(new MapFunction
(){ > private static final long serialVersionUID = -1723722950731109198L; >
[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.
[ https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453414#comment-16453414 ] Bob Lau commented on FLINK-9259: TableSource with RowTypeInfo, and I wont transform the DataStream to Table and registered in table environment.. How can I do this ? > The implementation of the SourceFunction is not serializable. > -- > > Key: FLINK-9259 > URL: https://issues.apache.org/jira/browse/FLINK-9259 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > The exception stack is as follows: > {code:java} > //Code placeholder > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416) > at > com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40) > at > org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.NotSerializableException: > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 21 more > {code} > I've implement the serializable interface in the implementation of the > SourceFunction. > The code is as follows: > > {code:java} > //Code placeholder > @Override > public void run(SourceContext ctx) > throws Exception { > stream.map(new MapFunction(){ > private static final long serialVersionUID = -1723722950731109198L; > @Override > public Row map(Row input) throws Exception { > ctx.collect(input); > return null; > } > }); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.
[ https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453402#comment-16453402 ] yuqi commented on FLINK-9259: - More detail are needed, you can paste more about your code [~bob365] > The implementation of the SourceFunction is not serializable. > -- > > Key: FLINK-9259 > URL: https://issues.apache.org/jira/browse/FLINK-9259 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > The exception stack is as follows: > {code:java} > //Code placeholder > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416) > at > com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40) > at > org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.NotSerializableException: > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 21 more > {code} > I've implement the serializable interface in the implementation of the > SourceFunction. > The code is as follows: > > {code:java} > //Code placeholder > @Override > public void run(SourceContext ctx) > throws Exception { > stream.map(new MapFunction(){ > private static final long serialVersionUID = -1723722950731109198L; > @Override > public Row map(Row input) throws Exception { > ctx.collect(input); > return null; > } > }); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.
[ https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453379#comment-16453379 ] mingleizhang commented on FLINK-9259: - I would not think it is a bug since {{SourceFunction}} has implements {{Serializable}} interface. From your error, I think you implement your own {{SourceFunction}} which contains fields that does not serializable. > The implementation of the SourceFunction is not serializable. > -- > > Key: FLINK-9259 > URL: https://issues.apache.org/jira/browse/FLINK-9259 > Project: Flink > Issue Type: Bug > Components: DataStream API, Table API SQL >Affects Versions: 1.5.0 >Reporter: Bob Lau >Priority: Major > > The exception stack is as follows: > {code:java} > //Code placeholder > org.apache.flink.api.common.InvalidProgramException: The implementation of > the SourceFunction is not serializable. The object probably contains or > references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416) > at > com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40) > at > org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97) > at > org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308) > at > org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262) > at > com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338) > at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.NotSerializableException: > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) > ... 21 more > {code} > I've implement the serializable interface in the implementation of the > SourceFunction. > The code is as follows: > > {code:java} > //Code placeholder > @Override > public void run(SourceContext ctx) > throws Exception { > stream.map(new MapFunction(){ > private static final long serialVersionUID = -1723722950731109198L; > @Override > public Row map(Row input) throws Exception { > ctx.collect(input); > return null; > } > }); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)