hwanju commented on pull request #14499:
URL: https://github.com/apache/flink/pull/14499#issuecomment-761572560


   > I believe sources are relatively important, in particular because some 
users have their own implementations there.
   > @hwanju do you want to address this in this PR, or in a follow up?
   
   This is nice catch! This should be addressed as part of this PR. When this 
patch was written, the old Flink version did invoke `run()` in `invoke()` in 
the same thread, but now with mailbox approach, it spawns a separate thread for 
run(). Yes, `FlinkSecurityManager` supports thread-inherited monitoring flag, 
so why this spawned thread wasn't protected? The reason is the thread local 
variables are inherited at the time of thread construction, not thread run, and 
the construction happens in `loadAndInstantiateInvokable` before `invoke`. By 
wrapping `loadAndInstantiateInvokable` with exit monitoring, I see it's 
protected as follows:
   
   ```
   org.apache.flink.runtime.UserSystemExitException: Flink user code attempted 
to exit JVM.
        at 
org.apache.flink.runtime.security.FlinkSecurityManager.checkExit(FlinkSecurityManager.java:184)
        at java.lang.Runtime.exit(Runtime.java:107)
        at java.lang.System.exit(System.java:971)
        at 
org.apache.flink.streaming.tests.FailureTestSource.run(FailureTestSource.java:65)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:245)
   ```
   
   The only thing left right now is unit test coverage. Having spent a bit of 
time, it doesn't seem straightforward to do StreamSourceTask testing unlike 
StreamTask, due to some trivial issues like serialization. It's not impossible 
and maybe it's just me not knowing proper test utils yet. I will try a bit more 
and update the PR with conclusion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to