[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.

2018-04-25 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453430#comment-16453430
 ] 

Bob Lau commented on FLINK-9259:


[~mingleizhang] I wont use parameter like inputStream ( generate last step ) 
,and I wont trans it to new DataStrea.
{code:java}
//代码占位符

public class RowTableSourceFunction implements SourceFunction, 
ResultTypeQueryable {
private final String[] fields;

private final TypeInformation[] typeInfos;

private final DataStream stream;


public RowTableSourceFunction(DataStream stream, String[] fields, 
TypeInformation[] typeInfos){

this.fields = fields;

this.typeInfos = typeInfos;

this.stream = stream;

}


@Override

public void run(SourceContext ctx) throws Exception {

stream.flatMap(new FlatMapFunction(){

@Override

public void flatMap(Row input, Collector out) throws Exception {

ctx.collect(input);

}
}
}



});
{code}
 

> The implementation of the SourceFunction is not serializable. 
> --
>
> Key: FLINK-9259
> URL: https://issues.apache.org/jira/browse/FLINK-9259
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> The exception stack is as follows:
> {code:java}
> //Code placeholder
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)
> at 
> com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40)
> at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.NotSerializableException: 
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 21 more
> {code}
> I've implement the serializable interface in the implementation of the 
> SourceFunction.
> The code is as follows:
>  
> {code:java}
> //Code placeholder
> @Override
> public void run(SourceContext ctx)
> throws Exception {
> stream.map(new MapFunction(){
> private static final long serialVersionUID = -1723722950731109198L;
> 

[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.

2018-04-25 Thread Bob Lau (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453414#comment-16453414
 ] 

Bob Lau commented on FLINK-9259:


TableSource with RowTypeInfo, and I wont transform the DataStream to 
Table  and registered in table environment.. How can I do this ? 

> The implementation of the SourceFunction is not serializable. 
> --
>
> Key: FLINK-9259
> URL: https://issues.apache.org/jira/browse/FLINK-9259
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> The exception stack is as follows:
> {code:java}
> //Code placeholder
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)
> at 
> com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40)
> at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.NotSerializableException: 
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 21 more
> {code}
> I've implement the serializable interface in the implementation of the 
> SourceFunction.
> The code is as follows:
>  
> {code:java}
> //Code placeholder
> @Override
> public void run(SourceContext ctx)
> throws Exception {
> stream.map(new MapFunction(){
> private static final long serialVersionUID = -1723722950731109198L;
> @Override
> public Row map(Row input) throws Exception {
> ctx.collect(input);
> return null;
> }
> });
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.

2018-04-25 Thread yuqi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453402#comment-16453402
 ] 

yuqi commented on FLINK-9259:
-

More detail are needed, you can paste more about your code [~bob365]

> The implementation of the SourceFunction is not serializable. 
> --
>
> Key: FLINK-9259
> URL: https://issues.apache.org/jira/browse/FLINK-9259
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> The exception stack is as follows:
> {code:java}
> //Code placeholder
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)
> at 
> com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40)
> at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.NotSerializableException: 
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 21 more
> {code}
> I've implement the serializable interface in the implementation of the 
> SourceFunction.
> The code is as follows:
>  
> {code:java}
> //Code placeholder
> @Override
> public void run(SourceContext ctx)
> throws Exception {
> stream.map(new MapFunction(){
> private static final long serialVersionUID = -1723722950731109198L;
> @Override
> public Row map(Row input) throws Exception {
> ctx.collect(input);
> return null;
> }
> });
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9259) The implementation of the SourceFunction is not serializable.

2018-04-25 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16453379#comment-16453379
 ] 

mingleizhang commented on FLINK-9259:
-

I would not think it is a bug since {{SourceFunction}} has implements 
{{Serializable}} interface. From your error, I think you implement your own 
{{SourceFunction}}  which contains fields that does not serializable.

> The implementation of the SourceFunction is not serializable. 
> --
>
> Key: FLINK-9259
> URL: https://issues.apache.org/jira/browse/FLINK-9259
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Bob Lau
>Priority: Major
>
> The exception stack is as follows:
> {code:java}
> //Code placeholder
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the SourceFunction is not serializable. The object probably contains or 
> references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1560)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1472)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1416)
> at 
> com..tysc.job.source.RowDataStreamSpecifyTableSource.getDataStream(RowDataStreamSpecifyTableSource.java:40)
> at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:95)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:135)
> at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:97)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
> at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:782)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
> at 
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
> at 
> com..tysc.job.service.SubmitJobService.submitJobToLocal(SubmitJobService.java:338)
> at com..tysc.rest.JobSubmitController$3.run(JobSubmitController.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.NotSerializableException: 
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:447)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 21 more
> {code}
> I've implement the serializable interface in the implementation of the 
> SourceFunction.
> The code is as follows:
>  
> {code:java}
> //Code placeholder
> @Override
> public void run(SourceContext ctx)
> throws Exception {
> stream.map(new MapFunction(){
> private static final long serialVersionUID = -1723722950731109198L;
> @Override
> public Row map(Row input) throws Exception {
> ctx.collect(input);
> return null;
> }
> });
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)