Alright, it works perfectly.
I checked that my Python methods are properly executed inside 
RichWindowFunction.
Thanks a lot!

p.s. for those who wonder why I use Jep, refer to 
https://sushant-hiray.me/posts/python-in-scala-stack/ 
<https://sushant-hiray.me/posts/python-in-scala-stack/> to grasp the idea of 
using Python inside Java through Jep instead of Jython and JyNI.


------------
class WindowFunction extends RichAllWindowFunction[String, String, 
GlobalWindow] {
  var jep: Option[Jep] = None

  override def open(parameters: Configuration): Unit = {
    jep = Some(new Jep())
    jep map (_.runScript("prediction.py"))
  }

  override def apply(window: GlobalWindow, iter: Iterable[String], out: 
Collector[String]): Unit = {
    ...
  }

> 2017. 3. 15. 오전 1:27, Chesnay Schepler <ches...@apache.org> 작성:
> 
> Hey,
> 
> Naturally this would imply that you're script is available on all nodes, so 
> you will have to distribute it manually.
> 
> On 14.03.2017 17:23, Chesnay Schepler wrote:
>> Hello,
>> 
>> I would suggest implementing the RichWindowFunction instead, and instantiate 
>> Jep within open(), or maybe do some lazy instantiation within apply.
>> 
>> Regards,
>> Chesnay
>> 
>> On 14.03.2017 15:47, 김동원 wrote:
>>> Hi all,
>>> 
>>> What is the proper way to call a Python function in WindowFunction.apply()?
>>> 
>>> I want to apply a Python function to values in a fixed-side sliding window.
>>> I'm trying it because
>>> - I'm currently working on time-series prediction using deep learning, 
>>> which is why I need a sliding window to get the latest N items from the 
>>> unbound data stream.
>>> - I already have a DNN written using Keras on top of Theano (Keras and 
>>> Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
>>> - There is no Python DataStream API, so I tried to use Scala DataStream API.
>>> - PySpark's structured streaming does not allow me to define UDAF (see a 
>>> question I posted on stackoverflow about it: 
>>> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0
>>>  
>>> <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>)
>>> - Spark DStream API does not look promising to this case due to the lack of 
>>> support in count window.
>>> 
>>> For these reasons, I thoughtlessly wrote a toy example to see the 
>>> feasibility of applying Python methods to values in the sliding window.
>>> --------
>>> import jep.Jep
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.util.Collector
>>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>>> 
>>> class WindowFunction extends AllWindowFunction[String, String, 
>>> GlobalWindow] {
>>>   val jep = new Jep()
>>>   jep.runScript("prediction.py")
>>> 
>>>   override def apply(window: GlobalWindow, iter: Iterable[String], out: 
>>> Collector[String]): Unit = {
>>>     // ...
>>>   }
>>> }
>>> 
>>> object main {
>>>   def main(args: Array[String]): Unit = {
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.socketTextStream("localhost", 9999)
>>>       .countWindowAll(5, 1)
>>>       .apply(new WindowFunction())
>>>       .print()
>>>     env.execute()
>>>   }
>>> }
>>> --------
>>> 
>>> Now I'm facing with serializable error with the following error messages:
>>> --------
>>> Exception in thread "main" 
>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>>     at 
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>>>     at 
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>>>     at 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
>>>     at 
>>> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
>>>     at 
>>> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
>>>     at main$.main(main.scala:23)
>>>     at main.main(main.scala)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:497)
>>>     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>>> Caused by: java.io.NotSerializableException: jep.Jep
>>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>     at 
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>     at 
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>     at 
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>     at 
>>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
>>>     at 
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
>>>     ... 11 more
>>> --------
>>> 
>>> Apparently, the source of problem is the third party library called Jep 
>>> which helps call Python scripts.
>>> Do I have to make the third party library serializable? 
>>> Or there's a way to figure out this sort of thing in a totally different 
>>> way in Flink?
>>> 
>>> Any help (even other frameworks than Flink) will be appreciated :-)
>>> Thanks you.
>>> 
>>> - Dongwon
>> 
> 

Reply via email to