Re: CREATE TABLE LIKE clause from different catalog or database

2020-11-16 Thread 김동원
Hi Ingo, 

Thank you for letting me know! I didn’t know that’s already discussed. 

Best,

Dongwon

> 2020. 11. 17. 오전 1:12, Ingo Bürk  작성:
> 
> 
> Hi,
> 
> I ran into the same issue today. This is fixed in 1.11.3, the corresponding 
> bug was FLINK-19281.
> 
> A workaround is to switch the current catalog and database temporarily to 
> hive.navi and then not qualify the table name in the LIKE clause.
> 
> 
> Regards
> Ingo
> 
> 
>> On Mon, Nov 16, 2020, 17:04 Dongwon Kim  wrote:
>> Hi Danny~
>> Sorry for late reply,
>> 
>> Let's take a look at a running example:
>>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>>   .inBatchMode()
>>>   .build();
>>> 
>>> TableEnvironment tEnv = TableEnvironment.create(settings);
>>> 
>>> HiveCatalog hiveCatalog = new HiveCatalog("hive",null, args[1]);
>>> tEnv.registerCatalog("hive", hiveCatalog);
>>> 
>>> GenericInMemoryCatalog inmemCatalog = new GenericInMemoryCatalog("inmem");
>>> tEnv.registerCatalog("inmem", inmemCatalog);
>>> tEnv.useCatalog("inmem");
>>> 
>>> TableResult result = tEnv.executeSql(
>>>   "CREATE TABLE copied LIKE hive.navi.gps"
>>> );
>> 
>> I've got the following log messages:
>>> 00:50:22,157 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
>>> [] - Setting hive conf dir as /Users/eastcirclek/hive-conf
>>> 00:50:22,503 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
>>> [] - Created HiveCatalog 'hive'
>>> 00:50:22,515 INFO  hive.metastore   
>>> [] - Trying to connect to metastore with URI thrift://...:9083
>>> 00:50:22,678 INFO  hive.metastore   
>>> [] - Connected to metastore.
>>> 00:50:22,678 INFO  org.apache.flink.table.catalog.hive.HiveCatalog  
>>> [] - Connected to Hive metastore
>>> 00:50:22,799 INFO  org.apache.flink.table.catalog.CatalogManager
>>> [] - Set the current default catalog as [inmem] and the current default 
>>> database as [default].
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> Source table '`inmem`.`default`.`hive.navi.gps`' of the LIKE clause not 
>>> found in the catalog, at line 1, column 26
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lambda$lookupLikeSourceTable$1(SqlCreateTableConverter.java:147)
>>> at java.util.Optional.orElseThrow(Optional.java:290)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.lookupLikeSourceTable(SqlCreateTableConverter.java:147)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:96)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76)
>>> at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190)
>>> at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>>> at Test.main(Test.java:53)
>> 
>> It seems like hive.navi.gps is recognized as a table name as a whole. 
>> I currently declare such a table by specifying all fields without the LIKE 
>> clause.
>> 
>> Do I miss something?
>> 
>> FYI, I'm working with Flink-1.11.2.
>> 
>> Thank you~
>> 
>> Best,
>> 
>> Dongwon
>> 
>> 
>>> On Fri, Nov 13, 2020 at 5:19 PM Danny Chan  wrote:
>>> Hi Dongwon ~
>>> 
>>> Table from different catalog/db is supported, you need to specify the full 
>>> path of the source table:
>>> 
>>> CREATE TABLE Orders_with_watermark (
>>> ...
>>> ) WITH (
>>> ...
>>> )
>>> LIKE my_catalog.my_db.Orders;
>>> 
>>> Dongwon Kim  于2020年11月11日周三 下午2:53写道:
 Hi,
 
 Is it disallowed to refer to a table from different databases or catalogs 
 when someone creates a table?
 
 According to [1], there's no way to refer to tables belonging to different 
 databases or catalogs.
 
 [1] 
 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
 
 Best,
 
 Dongwon


Re: User program failures cause JobManager to be shutdown

2019-12-09 Thread 김동원
Hi Robert,

Yeah, I know. For the moment, I warned my colleagues not to call System.exit() 
:-) But it needs to be implemented for the sake of Flink usability as you 
described in the issue.
Thanks a lot for taking care of this issue.

Best,

Dongwon

> 2019. 12. 9. 오후 9:55, Robert Metzger  작성:
> 
> 
> Hey Dongwon,
> I filed a ticket: https://issues.apache.org/jira/browse/FLINK-15156
> This does not mean it will be implemented anytime soon :)
> 
>> On Mon, Dec 9, 2019 at 2:25 AM Dongwon Kim  wrote:
>> Hi Robert and Roman, 
>> Yeah, letting users know System.exit() is called would be much more 
>> appropriate than just intercepting and ignoring.
>> 
>> Best,
>> Dongwon
>> 
>>> On Sat, Dec 7, 2019 at 11:29 PM Robert Metzger  wrote:
>>> I guess we could manage the security only when calling the user's main() 
>>> method.
>>> 
>>> This problem actually exists for all usercode in Flink: You can also kill 
>>> TaskManagers like this.
>>> If we are going to add something like this to Flink, I would only log that 
>>> System.exit() has been called by the user code, not intercept and ignore 
>>> the call.
>>> 
 On Fri, Dec 6, 2019 at 10:31 AM Khachatryan Roman 
  wrote:
 Hi Dongwon,
 
 This should work but it could also interfere with Flink itself exiting in 
 case of a fatal error.
 
 Regards,
 Roman
 
 
> On Fri, Dec 6, 2019 at 2:54 AM Dongwon Kim  wrote:
> FYI, we've launched a session cluster where multiple jobs are managed by 
> a job manager. If that happens, all the other jobs also fail because the 
> job manager is shut down and all the task managers get into chaos 
> (failing to connect to the job manager).
> 
> I just searched a way to prevent System.exit() calls from terminating 
> JVMs and found [1]. Can it be a possible solution to the problem?
> 
> [1] 
> https://stackoverflow.com/questions/5549720/how-to-prevent-calls-to-system-exit-from-terminating-the-jvm
> 
> Best,
> - Dongwon
> 
>> On Fri, Dec 6, 2019 at 10:39 AM Dongwon Kim  
>> wrote:
>> Hi Robert and Roman,
>> 
>> Thank you for taking a look at this.
>> 
>>> what is your main() method / client doing when it's receiving wrong 
>>> program parameters? Does it call System.exit(), or something like that?
>> 
>> I just found that our HTTP client is programmed to call System.exit(1). 
>> I should guide not to call System.exit() in Flink applications. 
>> 
>> p.s. Just out of curiosity, is there no way for the web app to intercept 
>> System.exit() and prevent the job manager from being shutting down?
>> 
>> Best,
>> 
>> - Dongwon
>> 
>>> On Fri, Dec 6, 2019 at 3:59 AM Robert Metzger  
>>> wrote:
>>> Hi Dongwon,
>>> 
>>> what is your main() method / client doing when it's receiving wrong 
>>> program parameters? Does it call System.exit(), or something like that?
>>> 
>>> By the way, the http address from the error message is publicly 
>>> available. Not sure if this is internal data or not.
>>> 
 On Thu, Dec 5, 2019 at 6:32 PM Khachatryan Roman 
  wrote:
 Hi Dongwon,
 
 I wasn't able to reproduce your problem with Flink JobManager 1.9.1 
 with various kinds of errors in the job.
 I suggest you try it on a fresh Flink installation without any other 
 jobs submitted.
 
 Regards,
 Roman
 
 
> On Thu, Dec 5, 2019 at 3:48 PM Dongwon Kim  
> wrote:
> Hi Roman,
> 
> We're using the latest version 1.9.1 and those two lines are all I've 
> seen after executing the job on the web ui.
> 
> Best,
> 
> Dongwon
> 
>> On Thu, Dec 5, 2019 at 11:36 PM r_khachatryan 
>>  wrote:
>> Hi Dongwon,
>> 
>> Could you please provide Flink version you are running and the job 
>> manager
>> logs?
>> 
>> Regards,
>> Roman
>> 
>> 
>> eastcirclek wrote
>> > Hi,
>> > 
>> > I tried to run a program by uploading a jar on Flink UI. When I
>> > intentionally enter a wrong parameter to my program, JobManager 
>> > dies.
>> > Below
>> > is all log messages I can get from JobManager; JobManager dies as 
>> > soon as
>> > spitting the second line:
>> > 
>> > 2019-12-05 04:47:58,623 WARN
>> >>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
>> >> Configuring the job submission via query parameters is 
>> >> deprecated. Please
>> >> migrate to submitting a JSON request instead.
>> >>
>> >>
>> >> *2019-12-05 04:47:59,133 ERROR com.skt.apm.http.HTTPClient
>> >>   - Cannot
>> >> 

Re: Modify EventTimeTrigger only for event-time session windows

2018-07-23 Thread 김동원
.TriggerContext): TriggerResult = {
  TriggerResult.FIRE_AND_PURGE
  }

Instead of returning FIRE_AND_PURGE, I register a new event-time timer:

if (eval(element)) {
ctx.registerEventTimeTimer(timestamp + delay)
}

Please note that the timestamp of the registered timer is not equal to 
window.maxTimestamp.
That's why I plan to return FIRE_AND_PURGE whenever 
DelayedEarlyResultEventTimeTrigger.onEventTime is called as below:

override def onEventTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext): TriggerResult = {
TriggerResult.FIRE_AND_PURGE
}

What I want to be sure about here is that the original 
EventTimeTrigger.onEventTime is not called except a session window is expired.




I'm not sure whether I understand your question and answer correctly, but 
hoping that it will give you the detailed idea of what I'm trying to figure out.

p.s. we have a large session gap (=1 hour) and do not enable the allowed 
lateness.

- Dongwon


On Tue, Jul 24, 2018 at 12:05 AM, Aljoscha Krettek mailto:aljos...@apache.org>> wrote:
Out of curiosity, why don't you want to keep it like this?

@Override
public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
return time == window.maxTimestamp() ?
        TriggerResult.FIRE :
TriggerResult.CONTINUE;
}

I mean checking for the maxTimestamp().

Best,
Aljoscha

> On 23. Jul 2018, at 16:10, 김동원  <mailto:eastcirc...@gmail.com>> wrote:
> 
> Hi Aljoscha,
> 
> If that is the only case, I need to return TriggerResult.CONTINUE when time > 
> window.maxTimestamp.
> 
> It is very fortunate that onEventTime is not called when time < 
> window.maxTimestamp except my timer.
> 
> Thanks a lot for your quick reply.
> 
> Best,
> 
> - Dongwon
> 
> 2018. 7. 23. 오후 10:58, Aljoscha Krettek  <mailto:aljos...@apache.org>> 작성:
> 
>> Hi,
>> 
>> If you set an allowed lateness that is greater than zero you will get 
>> another call to onEventTime() on window.maxTimestamp + allowedLateness.
>> 
>> Does that help answer your question?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 23. Jul 2018, at 15:40, Dongwon Kim >> <mailto:eastcirc...@gmail.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I want to be sure about when EventTimeTrigger.onEventTime() method is 
>>> called with event-time session windows.
>>> It returns TriggerResult.FIRE only when the timestamp of the registered 
>>> timer equals to the max timestamp of the current window:
>>> 
>>> @Override
>>> public TriggerResult onEventTime(long time, TimeWindow window, 
>>> TriggerContext ctx) {
>>> return time == window.maxTimestamp() ?
>>> TriggerResult.FIRE :
>>> TriggerResult.CONTINUE;
>>> }
>>> 
>>> As far as I understand, when EventTimeTrigger is used with event-time 
>>> session window, there's no chance of EventTimeTrigger.onEventTime being 
>>> called with time != window.maxTimestamp.
>>> Is it true? If not, could anyone let me know the corner case?
>>> 
>>> The reason why I'm asking is because I want to register an event-time timer 
>>> when my custom trigger receives a special event which signifies the end of 
>>> a session.
>>> The timestamp of the registered timer is not going to be equal to 
>>> window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in 
>>> such a case.
>>> As I also want to purge the content of a window when it expires, 
>>> onEventTime should look like this:
>>> 
>>> @Override
>>> public TriggerResult onEventTime(long time, TimeWindow window, 
>>> TriggerContext ctx) {
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> 
>>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>>> It looks quite dangerous and I'm not quite sure about that.
>>> 
>>> - Dongwon
>>> 
>> 




Re: Modify EventTimeTrigger only for event-time session windows

2018-07-23 Thread 김동원
Hi Aljoscha,

If that is the only case, I need to return TriggerResult.CONTINUE when time > 
window.maxTimestamp.

It is very fortunate that onEventTime is not called when time < 
window.maxTimestamp except my timer.

Thanks a lot for your quick reply.

Best,

- Dongwon

2018. 7. 23. 오후 10:58, Aljoscha Krettek  작성:

> Hi,
> 
> If you set an allowed lateness that is greater than zero you will get another 
> call to onEventTime() on window.maxTimestamp + allowedLateness.
> 
> Does that help answer your question?
> 
> Best,
> Aljoscha
> 
>> On 23. Jul 2018, at 15:40, Dongwon Kim  wrote:
>> 
>> Hi all,
>> 
>> I want to be sure about when EventTimeTrigger.onEventTime() method is called 
>> with event-time session windows.
>> It returns TriggerResult.FIRE only when the timestamp of the registered 
>> timer equals to the max timestamp of the current window:
>> 
>>  @Override
>>  public TriggerResult onEventTime(long time, TimeWindow window, 
>> TriggerContext ctx) {
>>  return time == window.maxTimestamp() ?
>>  TriggerResult.FIRE :
>>  TriggerResult.CONTINUE;
>>  }
>> 
>> As far as I understand, when EventTimeTrigger is used with event-time 
>> session window, there's no chance of EventTimeTrigger.onEventTime being 
>> called with time != window.maxTimestamp.
>> Is it true? If not, could anyone let me know the corner case?
>> 
>> The reason why I'm asking is because I want to register an event-time timer 
>> when my custom trigger receives a special event which signifies the end of a 
>> session.
>> The timestamp of the registered timer is not going to be equal to 
>> window.maxTimestamp and I want to return Trigger.Result.FIRE_AND_PURGE in 
>> such a case.
>> As I also want to purge the content of a window when it expires, onEventTime 
>> should look like this:
>> 
>>  @Override
>>  public TriggerResult onEventTime(long time, TimeWindow window, 
>> TriggerContext ctx) {
>>  return TriggerResult.FIRE_AND_PURGE;
>>  }
>> 
>> It will return FIRE_AND_PURGE every time MyTrigger.onEventTime is called.
>> It looks quite dangerous and I'm not quite sure about that.
>> 
>> - Dongwon
>> 
> 


JobManager not receiving resource offers from Mesos

2018-01-02 Thread 김동원
Hi,

I try to launch a Flink cluster on top of dc/os but TaskManagers are not 
launched at all.

What I do to launch a Flink cluster is as follows:
- Click "flink" from "Catalog" on the left panel of dc/os GUI.
- Click "Run service" without any modification on configuration for the purpose 
of testing (Figure 1).

Until now, everything seems okay as shown in Figure 2.
However, Figure 3 shows that TaskManager has never been launched.

So I take a look at JobManager log (see the attached "log.txt" for full log).
LaunchCoordinator is spitting the same log messages while staying in 
"GetheringOffers" state as follows:
INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- 
Processing 1 task(s) against 0 new offer(s) plus outstanding off$
DEBUG com.netflix.fenzo.TaskScheduler   - Found 0 
VMs with non-zero offers to assign from
INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- Resources 
considered: (note: expired offers not deducted from be$
DEBUG org.apache.flink.mesos.scheduler.LaunchCoordinator- 
SchedulingResult{resultMap={}, failures={}, leasesAdded=0, lease$
INFO  org.apache.flink.mesos.scheduler.LaunchCoordinator- Waiting 
for more offers; 1 task(s) are not yet launched.
(FYI, ConnectionMonitor is in its "ConnectedState" as you can see in the full 
log file.)

Can anyone point out what's going wrong on my dc/os installation?
Thanks you for attention. I'm really looking forward to running Flink clusters 
on dc/os :-)

p.s. I tested whether dc/os is working correctly by using the following scripts 
and it works.
{
 "id": "simple-gpu-test",
 "acceptedResourceRoles":["slave_public", "*"],
 "cmd": "while [ true ] ; do nvidia-smi; sleep 5; done",
 "cpus": 1,
 "mem": 128,
 "disk": 0,
 "gpus": 1,
 "instances": 8
}




Re: Proper way to call a Python function in WindowFunction.apply()

2017-03-14 Thread 김동원
Alright, it works perfectly.
I checked that my Python methods are properly executed inside 
RichWindowFunction.
Thanks a lot!

p.s. for those who wonder why I use Jep, refer to 
https://sushant-hiray.me/posts/python-in-scala-stack/ 
<https://sushant-hiray.me/posts/python-in-scala-stack/> to grasp the idea of 
using Python inside Java through Jep instead of Jython and JyNI.



class WindowFunction extends RichAllWindowFunction[String, String, 
GlobalWindow] {
  var jep: Option[Jep] = None

  override def open(parameters: Configuration): Unit = {
jep = Some(new Jep())
jep map (_.runScript("prediction.py"))
  }

  override def apply(window: GlobalWindow, iter: Iterable[String], out: 
Collector[String]): Unit = {
...
  }

> 2017. 3. 15. 오전 1:27, Chesnay Schepler <ches...@apache.org> 작성:
> 
> Hey,
> 
> Naturally this would imply that you're script is available on all nodes, so 
> you will have to distribute it manually.
> 
> On 14.03.2017 17:23, Chesnay Schepler wrote:
>> Hello,
>> 
>> I would suggest implementing the RichWindowFunction instead, and instantiate 
>> Jep within open(), or maybe do some lazy instantiation within apply.
>> 
>> Regards,
>> Chesnay
>> 
>> On 14.03.2017 15:47, 김동원 wrote:
>>> Hi all,
>>> 
>>> What is the proper way to call a Python function in WindowFunction.apply()?
>>> 
>>> I want to apply a Python function to values in a fixed-side sliding window.
>>> I'm trying it because
>>> - I'm currently working on time-series prediction using deep learning, 
>>> which is why I need a sliding window to get the latest N items from the 
>>> unbound data stream.
>>> - I already have a DNN written using Keras on top of Theano (Keras and 
>>> Theano are Python libraries) in order to exploit Nvidia's CUDA library . 
>>> - There is no Python DataStream API, so I tried to use Scala DataStream API.
>>> - PySpark's structured streaming does not allow me to define UDAF (see a 
>>> question I posted on stackoverflow about it: 
>>> http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0
>>>  
>>> <http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0>)
>>> - Spark DStream API does not look promising to this case due to the lack of 
>>> support in count window.
>>> 
>>> For these reasons, I thoughtlessly wrote a toy example to see the 
>>> feasibility of applying Python methods to values in the sliding window.
>>> 
>>> import jep.Jep
>>> import org.apache.flink.streaming.api.scala._
>>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>> import org.apache.flink.util.Collector
>>> import org.apache.flink.streaming.api.scala.function.AllWindowFunction
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
>>> 
>>> class WindowFunction extends AllWindowFunction[String, String, 
>>> GlobalWindow] {
>>>   val jep = new Jep()
>>>   jep.runScript("prediction.py")
>>> 
>>>   override def apply(window: GlobalWindow, iter: Iterable[String], out: 
>>> Collector[String]): Unit = {
>>> // ...
>>>   }
>>> }
>>> 
>>> object main {
>>>   def main(args: Array[String]): Unit = {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.socketTextStream("localhost", )
>>>   .countWindowAll(5, 1)
>>>   .apply(new WindowFunction())
>>>   .print()
>>> env.execute()
>>>   }
>>> }
>>> 
>>> 
>>> Now I'm facing with serializable error with the following error messages:
>>> 
>>> Exception in thread "main" 
>>> org.apache.flink.api.common.InvalidProgramException: Task not serializable
>>> at 
>>> org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
>>> at 
>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
>>> at 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
>>> at 
>>> org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
>>> at 
>>> org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
>>> at main$.main(main.scala:23)
>>&

Proper way to call a Python function in WindowFunction.apply()

2017-03-14 Thread 김동원
Hi all,

What is the proper way to call a Python function in WindowFunction.apply()?

I want to apply a Python function to values in a fixed-side sliding window.
I'm trying it because
- I'm currently working on time-series prediction using deep learning, which is 
why I need a sliding window to get the latest N items from the unbound data 
stream.
- I already have a DNN written using Keras on top of Theano (Keras and Theano 
are Python libraries) in order to exploit Nvidia's CUDA library . 
- There is no Python DataStream API, so I tried to use Scala DataStream API.
- PySpark's structured streaming does not allow me to define UDAF (see a 
question I posted on stackoverflow about it: 
http://stackoverflow.com/questions/42747236/how-to-define-udaf-over-event-time-windows-in-pyspark-2-1-0
 
)
- Spark DStream API does not look promising to this case due to the lack of 
support in count window.

For these reasons, I thoughtlessly wrote a toy example to see the feasibility 
of applying Python methods to values in the sliding window.

import jep.Jep
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

class WindowFunction extends AllWindowFunction[String, String, GlobalWindow] {
  val jep = new Jep()
  jep.runScript("prediction.py")

  override def apply(window: GlobalWindow, iter: Iterable[String], out: 
Collector[String]): Unit = {
// ...
  }
}

object main {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("localhost", )
  .countWindowAll(5, 1)
  .apply(new WindowFunction())
  .print()
env.execute()
  }
}


Now I'm facing with serializable error with the following error messages:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
Task not serializable
at 
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at 
org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:666)
at 
org.apache.flink.streaming.api.scala.AllWindowedStream.clean(AllWindowedStream.scala:568)
at 
org.apache.flink.streaming.api.scala.AllWindowedStream.apply(AllWindowedStream.scala:315)
at main$.main(main.scala:23)
at main.main(main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.io.NotSerializableException: jep.Jep
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:317)
at 
org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 11 more


Apparently, the source of problem is the third party library called Jep which 
helps call Python scripts.
Do I have to make the third party library serializable? 
Or there's a way to figure out this sort of thing in a totally different way in 
Flink?

Any help (even other frameworks than Flink) will be appreciated :-)
Thanks you.

- Dongwon