Hi Radu, both emails reached the mailing list :)
You can not reference to DataSets or DataStreams from inside user defined functions. Both are just abstractions for a data set or stream, so the elements are not really inside the set. We don't have any support for mixing the DataSet and DataStream API. For your use case, I would recommend you to use a RichFlatMapFunction and in the open() call read the text file. On Tue, Dec 1, 2015 at 5:03 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote: > > > Hello, > > > > I am not sure if this message was received on the user list, if so I > apologies for duplicate messages > > > > I have the following scenario > > > > · Reading a fixed set > > DataStream<String> *fixedset* = env.readtextFile(… > > · Reading a continuous stream of data > > DataStream<String> *stream* = …. > > > > I would need that for each event read from the continuous stream to make > some operations onit and on the *fixedsettoghether* > > > > > > I have tried something like > > > > final myObject.referenceStaticSet = fixedset; > > stream.map(new MapFunction<String, String>() { > > @Override > > public String map(String arg0) throws Exception { > > > > //for example: final string2add = arg0; > > //the goal > of below function would be to add the string2add to the fixedset > > myObject.referenceStaticSet = > myObject.referenceStaticSet.flatMap(new FlatMapFunction<String, String>() { > > > > @Override > > public void flatMap(String arg0, > Collector<String> arg1) > > > //for example adding to the fixed set also the string2add object: > arg1.collect(string2add); > > > } > > … > > } > > > > However, I get an exception (Exception in thread "main" > *org.apache.flink.api.common.InvalidProgramException*: ) that object is > not serializable (Object MyClass$3@a71081 not serializable ) > > > > Looking into this I see that the issues is that the DataStream<> is not > serializable. What would be the solution to this issue? > > > > As I mentioned before, I would like that for each event from the > continuous stream to use the initial fixed set, add the event to it and > apply an operation. > > Stephan was mentioning at some point some possibility to create a DataSet > and launch a batch processing while operating in stream mode– in case this > is possible, can you give me a reference for it, because it might be the > good solution to use in case. I am thinking that I could keep the fixed > set as a DataSet and as each new event comes, transform it into a dataset > and then join with reference set and apply an operation > > > > Regards, > > > > > > > > > > Dr. Radu Tudoran > > Research Engineer > > IT R&D Division > > > > [image: cid:image007.jpg@01CD52EB.AD060EE0] > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>* > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > > > > *From:* Vieru, Mihail [mailto:mihail.vi...@zalando.de] > *Sent:* Tuesday, December 01, 2015 4:55 PM > *To:* user@flink.apache.org > *Subject:* NPE with Flink Streaming from Kafka > > > > Hi, > > we get the following NullPointerException after ~50 minutes when running a > streaming job with windowing and state that reads data from Kafka and > writes the result to local FS. > > There are around 170 million messages to be processed, Flink 0.10.1 stops > at ~8 million. > > Flink runs locally, started with the "start-cluster-streaming.sh" script. > > > 12/01/2015 15:06:24 Job execution switched to status RUNNING. > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched > to SCHEDULED > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched > to DEPLOYING > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > SCHEDULED > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > DEPLOYING > 12/01/2015 15:06:24 Source: Custom Source -> Map -> Map(1/1) switched > to RUNNING > 12/01/2015 15:06:24 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > RUNNING > 12/01/2015 15:56:08 Fast TumblingTimeWindows(5000) of Reduce at > main(ItemPriceAvgPerOrder.java:108) -> Sink: Unnamed(1/1) switched to > CANCELED > 12/01/2015 15:56:08 Source: Custom Source -> Map -> Map(1/1) switched > to FAILED > java.lang.Exception > at > org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at > org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:115) > at org.I0Itec.zkclient.ZkClient$10.call(ZkClient.java:817) > at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) > at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) > at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) > at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:332) > at kafka.utils.ZkUtils.updatePersistentPath(ZkUtils.scala) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.setOffsetInZooKeeper(ZookeeperOffsetHandler.java:112) > at > org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler.commit(ZookeeperOffsetHandler.java:80) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer$PeriodicOffsetCommitter.run(FlinkKafkaConsumer.java:632) > > Any ideas on what could cause this behaviour? > > > > Best, > > Mihail >