[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user meyer-net commented on the issue: https://github.com/apache/flink/pull/3838 is the version of 1.5 supports python api for streaming applications? ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 Here's a preliminary changelog: General: - rebase branch to current master - incremented version to 1.5-SNAPSHOT - fixed kafka-connector dependency declaration - set to provided - scala version set to scala.binary.version - flink version set to project.version - applied checkstyle - disabled method/parameter name rules for API classes - assigned flink-python-streaming to 'libraries' travis profile API: - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator - renamed PDS#print() to PDS#output() - print is a keyword in python and thus not usable in native python APIs - added PythonSingleOutputStreamOperator#name() - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods Moved/Renamed: - made SerializerMap top-level class and renamed it to AdapterMap - Moved UtilityFunctions#adapt to AdapterMap class - renamed UtilityFunctions to InterpreterUtils - moved PythonobjectInputStream2 to SerializationUtils - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream Functions: - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization - added generic return type to Serializationutils#deserializeObject - added new serializers for PyBoolean/-Float/-Integer/-Long/-String - PyObjectSerializer not properly fails when an exceptioin occurs - improved error printing - PythonCollector now typed to Object and properly converts non-PyObjects - jython functions that use a collector now have Object has output type - otherwise you would get ClassCastException if jython returns something that isn't a PyObject PythonStreamBinder - adjusted to follow PythonPlanBinder structure - client-like main() exception handling - replaced Random usage with UUID.randomUIID() - now loads GlobalConfiguration - local/distributed tmp dir now configurable - introduced PythonOptions - no longer generate plan.py but instead import it directly via the PythonInterpreter Environment: - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory - program main() method now accepts a PythonEnvironmentFactory - directories are now passed properly to the environment instead of using static fields - removed PythonEnvironmentConfig Tests: - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled - removed python TestBase class - removed print statements from tests - standardized test job names - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest - run_all_tests improvements - stop after first failure - print stacktrace on failure - no longer relies on dirname() to get cwd but uses the module file location instead ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 I've been digging into this for the past week. I found a number of things to improve and did so in a local branch. Once I've finalized/tests things (probably tomorrow) I'll link the branch here or open another PR. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 There is none that I'm aware of. It is also possible for the JM and TM to run in the same JVM, say for tests or in local mode. I can't think of a nice way to solve this, so I suggest we simply disable the check for the PythonEnvironmentConfig class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 Referring to the issue with the ```PythonEnvironmentConfig ``` above, Is there any other global indication that I can use to test whether a given function is executed on the TaskManager? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 One of the critical attributes is ```PythonEnvironmentConfig::pythonTmpCachePath```, which is used in the following places: - ```PythonStreamExecutionEnvironment::execute:362``` - ```PythonStreamExecutionEnvironment::execute:400``` - ```PythonStreamBinder::prepareFiles:117``` On the client side, the temporary files are prepared for distribution by the ```PythonStreamBinder``` and then processed by the ``PythonStreamExecutionEnvironment::execute``` function, which is called from the Python script. When the python script is executed on the TaskManager, this attribute remains ```null``` and thus, the ```execute``` returns immediately. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 The only usage i found is in `UtilityFunctions#smartFunctionDeserialization`, which is only called from various java UDF classes. Unless there is another usage hidden somewhere i would suggest to add a `PythonEnvironmentConfig` argument to the `smartFunctionDeserialization` method, and all UDF classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 The thing is that I use the ```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver information from the ```PythonStreamBinder``` to a class that is called from the python script. How would you suggest to do it otherwise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 The check failed because the spotbugs plugin found something; this plugin isn't run by default when calling `mvn verify`. You can run the spotbugs locally by adding `-Dspotbugs` to the maven invocation. The found problem is the PythonEnvironmentConfig class, which contains public static non-final fields. I propose making these non-static and explicitly pass around a config object where needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 I'm trying to track down the root cause for the checks failures without a success. Obviously, the given project (flink-libraries/flink-streaming-python) in master branch passes the `verify` with success in my environment. Please advise, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 Regarding the exception - ```java.io.IOException: java.io.IOException: The given HDFS file URI ...``` In general, using the python interface requires a valid configuration of shared file system (.e.g HDFS), which designed to distribute the python files. Someone can bypass this issue by set the second argument to 'True' when calling to ```env.execute(...)``` in the python script. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 When running the example against a local cluster i got this exception ``` java.io.IOException: java.io.IOException: The given HDFS file URI (hdfs:/tmp/flink_cache_-4117839671387669278) did not describe the HDFS NameNode. The attempt to use a default HDFS configuration, as specified in the 'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the following problem: Either no default file system was registered, or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) describing the (hdfs namenode) host and port. ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 I can't seem to get the tests running. In the IDE i get this exception: ``` null Traceback (most recent call last): File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\plan.py", line 3, in run_all_testsae1bf92fc871d56dae4598b332a87804.main() File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\run_all_testsae1bf92fc871d56dae4598b332a87804.py", line 71, in main Main().run() File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\run_all_testsae1bf92fc871d56dae4598b332a87804.py", line 45, in run tests.append(__import__(test_module_name, globals(), locals())) File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\test_filter.py", line 25, in from utils.python_test_base import TestBase File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\utils\python_test_base.py", line 19, in from org.apache.flink.api.java.utils import ParameterTool java.lang.NoClassDefFoundError: org/apache/flink/api/java/utils (wrong name: org/apache/flink/api/java/Utils) at java.lang.ClassLoader.defineClass1(Native Method) ``` On the command-line the tests do run, but fail with this exception: ``` Submitting job ... 'test_filter' Get execution environment 2> (50, u'hello') null Traceback (most recent call last): File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\plan.py", line 3, in run_all_testsae1bf92fc871d56dae4598b332a87804.main() File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\run_all_testsae1bf92fc871d56dae4598b332a87804.py", line 71, in main Main().run() File "C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\run_all_testsae1bf92fc871d56dae4598b332a87804.py", line 59, in run print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, '#'*len(ex_type))) TypeError: object of type 'java.lang.Class' has no len() Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 17.835 sec <<< FAILURE! - in org.apache.flink.streaming.python.api.PythonStreamBinderTest testJob(org.apache.flink.streaming.python.api.PythonStreamBinderTest) Time elapsed: 17.393 sec <<< FAILURE! java.lang.AssertionError: Error while calling the test program: null ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 I'm gonna take a deeper look now, and play around with it a bit. Be aware that if you rebase the branch again you will hit a myriad of checkstyle violations, we will have to suppress the checks for method names for this module. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zohar-mizrahi commented on the issue: https://github.com/apache/flink/pull/3838 In the last change, I've rebased locally on top of origin/master, so I did `git push -f` to the master branch in my fork. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications
Github user zentol commented on the issue: https://github.com/apache/flink/pull/3838 It may take a while until i can review this; the 1.3 feature freeze just kicked in and it's time to test all the new features in depth. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---