[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2018-01-24 Thread meyer-net
Github user meyer-net commented on the issue:

https://github.com/apache/flink/pull/3838
  
is the version of 1.5 supports python api for streaming applications?


---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2018-01-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
Here's a preliminary changelog:

General:
- rebase branch to current master
- incremented version to 1.5-SNAPSHOT
- fixed kafka-connector dependency declaration
- set to provided
- scala version set to scala.binary.version 
- flink version set to project.version
- applied checkstyle
- disabled method/parameter name rules for API classes
- assigned flink-python-streaming to 'libraries' travis profile

API:
- PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
- renamed PDS#print() to PDS#output()
- print is a keyword in python and thus not usable in native python APIs
- added PythonSingleOutputStreamOperator#name()
- removed env#execute methods that accepted local execution argument as 
they are redundant due to environment factory methods

Moved/Renamed:
- made SerializerMap top-level class and renamed it to AdapterMap
- Moved UtilityFunctions#adapt to AdapterMap class
- renamed UtilityFunctions to InterpreterUtils
- moved PythonobjectInputStream2 to SerializationUtils
- renamed PythonObjectInputStream2 to 
SerialVersionOverridingPythonObjectInputStream

Functions:
- Introduced AbstractPythonUDF class for sharing 
RichRunction#open()/close() implementations
- PythonOutputSelector now throws FlinkRuntimeException when failing during 
initialization
- added generic return type to Serializationutils#deserializeObject
- added new serializers for PyBoolean/-Float/-Integer/-Long/-String
- PyObjectSerializer not properly fails when an exceptioin occurs
- improved error printing

- PythonCollector now typed to Object and properly converts non-PyObjects
- jython functions that use a collector now have Object has output type
- otherwise you would get ClassCastException if jython returns 
something that isn't a PyObject

PythonStreamBinder
- adjusted to follow PythonPlanBinder structure
- client-like main() exception handling
- replaced Random usage with UUID.randomUIID()
- now loads GlobalConfiguration
- local/distributed tmp dir now configurable
- introduced PythonOptions
- no longer generate plan.py but instead import it directly via the 
PythonInterpreter

Environment:
- Reworked static environment factory methods from 
PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
- program main() method now accepts a PythonEnvironmentFactory
- directories are now passed properly to the environment instead of using 
static fields
- removed PythonEnvironmentConfig

Tests:
- removed 'if __name__ == '__main__':' blocks from tests since the 
condition is never fulfilled
- removed python TestBase class
- removed print statements from tests
- standardized test job names
- cleaned up PythonStreamBinderTest / made it more consistent with 
PythonPlanBinderTest
- run_all_tests improvements
- stop after first failure
- print stacktrace on failure
- no longer relies on dirname() to get cwd but uses the module file 
location instead


---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2018-01-15 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I've been digging into this for the past week. I found a number of things 
to improve and did so in a local branch. Once I've finalized/tests things 
(probably tomorrow) I'll link the branch here or open another PR.


---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-29 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
There is none that I'm aware of. It is also possible for the JM and TM to 
run in the same JVM, say for tests or in local mode.

I can't think of a nice way to solve this, so I suggest we simply disable 
the check for the PythonEnvironmentConfig class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-24 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
Referring to the issue with the ```PythonEnvironmentConfig ``` above, Is 
there any other global indication that I can use to test whether a given 
function is executed on the TaskManager?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
One of the critical attributes is 
```PythonEnvironmentConfig::pythonTmpCachePath```, which is used in the 
following places: 
- ```PythonStreamExecutionEnvironment::execute:362```
- ```PythonStreamExecutionEnvironment::execute:400```
- ```PythonStreamBinder::prepareFiles:117```

On the client side, the temporary files are prepared for distribution by 
the ```PythonStreamBinder``` and then processed by the 
``PythonStreamExecutionEnvironment::execute``` function, which is called 
from the Python script. When the python script is executed on the TaskManager, 
this attribute remains ```null``` and thus, the ```execute``` returns 
immediately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
The only usage i found is in 
`UtilityFunctions#smartFunctionDeserialization`, which is only called from 
various java UDF classes. Unless there is another usage hidden somewhere i 
would suggest to add a `PythonEnvironmentConfig` argument to the 
`smartFunctionDeserialization` method, and all UDF classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
The thing is that I use the 
```PythonEnvironmentConfig.pythonTmpCachePath``` attribute to deliver 
information from the ```PythonStreamBinder``` to a class that is called from 
the python script. 
How would you suggest to do it otherwise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
The check failed because the spotbugs plugin found something; this plugin 
isn't run by default when calling `mvn verify`. You can run the spotbugs 
locally by adding `-Dspotbugs` to the maven invocation.

The found problem is the PythonEnvironmentConfig class, which contains 
public static non-final fields. I propose making these non-static and 
explicitly pass around a config object where needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-21 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm trying to track down the root cause for the checks failures without a 
success. Obviously, the given project (flink-libraries/flink-streaming-python) 
in master branch passes the `verify` with success in my environment.

Please advise,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-08-16 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
Regarding the exception - 
```java.io.IOException: java.io.IOException: The given HDFS file URI ...```

In general, using the python interface requires a valid configuration of 
shared file system (.e.g HDFS), which designed to distribute the python files. 
Someone can bypass this issue by set the second argument to 'True' when 
calling to ```env.execute(...)``` in the python script.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
When running the example against a local cluster i got this exception
```
java.io.IOException: java.io.IOException: The given HDFS file URI 
(hdfs:/tmp/flink_cache_-4117839671387669278) did not describe the HDFS 
NameNode. The attempt to use a default HDFS configuration, as specified in the 
'fs.hdfs.hdfsdefault' or 'fs.hdfs.hdfssite' config parameter failed due to the 
following problem: Either no default file system was registered, or the 
provided configuration contains no valid authority component (fs.default.name 
or fs.defaultFS) describing the (hdfs namenode) host and port.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I can't seem to get the tests running.

In the IDE i get this exception:
```
null
Traceback (most recent call last):
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\plan.py",
 line 3, in 
run_all_testsae1bf92fc871d56dae4598b332a87804.main()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 71, in main
Main().run()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 45, in run
tests.append(__import__(test_module_name, globals(), locals()))
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\test_filter.py",
 line 25, in 
from utils.python_test_base import TestBase
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_7475612982544297474\utils\python_test_base.py",
 line 19, in 
from org.apache.flink.api.java.utils import ParameterTool
java.lang.NoClassDefFoundError: org/apache/flink/api/java/utils (wrong 
name: org/apache/flink/api/java/Utils)
at java.lang.ClassLoader.defineClass1(Native Method)
```

On the command-line the tests do run, but fail with this exception:
```
Submitting job ... 'test_filter'
Get execution environment
2> (50, u'hello')
null
Traceback (most recent call last):
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\plan.py",
 line 3, in 
run_all_testsae1bf92fc871d56dae4598b332a87804.main()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 71, in main
Main().run()
  File 
"C:\Users\Zento\AppData\Local\Temp\flink_streaming_plan_-7332098182433468284\run_all_testsae1bf92fc871d56dae4598b332a87804.py",
 line 59, in run
print("\n{}\n{}\n{}\n".format('#'*len(ex_type), ex_type, 
'#'*len(ex_type)))
TypeError: object of type 'java.lang.Class' has no len()
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 17.835 sec 
<<< FAILURE! - in org.apache.flink.streaming.python.api.PythonStreamBinderTest
testJob(org.apache.flink.streaming.python.api.PythonStreamBinderTest)  Time 
elapsed: 17.393 sec  <<< FAILURE!
java.lang.AssertionError: Error while calling the test program: null
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-06-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
I'm gonna take a deeper look now, and play around with it a bit.

Be aware that if you rebase the branch again you will hit a myriad of 
checkstyle violations, we will have to suppress the checks for method names for 
this module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-05-22 Thread zohar-mizrahi
Github user zohar-mizrahi commented on the issue:

https://github.com/apache/flink/pull/3838
  
In the last change, I've rebased locally on top of origin/master, so I did 
`git push -f` to the master branch in my fork.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3838: [FLINK-5886] Python API for streaming applications

2017-05-09 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3838
  
It may take a while until i can review this; the 1.3 feature freeze just 
kicked in and it's time to test all the new features in depth.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---