[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329333978
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coders.py
 ##
 @@ -0,0 +1,67 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from apache_beam.coders import Coder, VarIntCoder
+from apache_beam.coders.coders import FastCoder
+
+from pyflink.fn_execution import coder_impl
+from pyflink.fn_execution import flink_fn_execution_pb2
+
+FLINK_SCHEMA_CODER_URN = "flink:coder:schema:v1"
+
+
+class RowCoder(FastCoder):
+"""
+Coder for Row.
+"""
+
+def __init__(self, field_coders):
+self._field_coders = field_coders
+
+def _create_impl(self):
+return coder_impl.RowCoderImpl([c.get_impl() for c in 
self._field_coders])
+
+def is_deterministic(self):
+return all(c.is_deterministic() for c in self._field_coders)
+
+def to_type_hint(self):
+from pyflink.table import Row
+return Row
+
+def __repr__(self):
 
 Review comment:
   Also implement `__eq__` and `__hash__` methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329334111
 
 

 ##
 File path: flink-python/pyflink/fn_execution/coders.py
 ##
 @@ -0,0 +1,67 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from apache_beam.coders import Coder, VarIntCoder
+from apache_beam.coders.coders import FastCoder
+
+from pyflink.fn_execution import coder_impl
+from pyflink.fn_execution import flink_fn_execution_pb2
+
+FLINK_SCHEMA_CODER_URN = "flink:coder:schema:v1"
+
 
 Review comment:
   `__all__ = ['RowCoder']`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329309345
 
 

 ##
 File path: flink-python/pyflink/table/udf.py
 ##
 @@ -0,0 +1,217 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import functools
+import inspect
+from abc import ABCMeta, abstractmethod
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_type
+from pyflink.util import utils
+
+__all__ = ['FunctionContext', 'ScalarFunction', 'udf']
+
+
+class FunctionContext(object):
+"""
+Used to obtain global runtime information about the context in which the
+user-defined function is executed. The information includes the metric 
group,
+and global job parameters, etc.
+"""
+pass
+
+
+class UserDefinedFunction(object):
+"""
+Base interface for user-defined function.
+"""
+__metaclass__ = ABCMeta
+
+def open(self, function_context):
+"""
+Initialization method for the function. It is called before the actual 
working methods
+and thus suitable for one time setup work.
+
+:param function_context: the context of the function
+:type function_context: FunctionContext
+"""
+pass
+
+def close(self):
+"""
+Tear-down method for the user code. It is called after the last call 
to the main
+working methods.
+"""
+pass
+
+def is_deterministic(self):
+"""
+Returns information about the determinism of the function's results.
+It returns true if and only if a call to this function is guaranteed to
+always return the same result given the same parameters. true is 
assumed by default.
+If the function is not pure functional like random(), date(), now(),
+this method must return false.
+
+:return: the determinism of the function's results.
+:rtype: bool
+"""
+return True
+
+
+class ScalarFunction(UserDefinedFunction):
+"""
+Base interface for user-defined scalar function. A user-defined scalar 
functions maps zero, one,
+or multiple scalar values to a new scalar value.
+"""
+
+@abstractmethod
+def eval(self, *args):
+"""
+Method which defines the logic of the scalar function.
+"""
+pass
+
+
+class LambdaScalarFunction(ScalarFunction):
+"""
+Helper scalar function implementation for lambda function. It's for 
internal use only.
+"""
+
+def __init__(self, func):
+self.func = func
+
+def eval(self, *args):
+return self.func(*args)
+
+
+class UserDefinedFunctionWrapper(object):
+"""
+Wrapper for Python user-defined function. It handles things like 
converting lambda
+functions to user-defined functions, creating the Java user-defined 
function representation,
+etc.
+"""
+
+def __init__(self, func, input_types, result_type, name=None, 
deterministic=True):
+if inspect.isclass(func) or (
+not isinstance(func, UserDefinedFunction) and not 
callable(func)):
+raise TypeError(
 
 Review comment:
   Add tests for these validations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329310180
 
 

 ##
 File path: flink-python/pyflink/table/udf.py
 ##
 @@ -0,0 +1,217 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import functools
+import inspect
+from abc import ABCMeta, abstractmethod
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_type
+from pyflink.util import utils
+
+__all__ = ['FunctionContext', 'ScalarFunction', 'udf']
+
+
+class FunctionContext(object):
+"""
+Used to obtain global runtime information about the context in which the
+user-defined function is executed. The information includes the metric 
group,
+and global job parameters, etc.
+"""
+pass
+
+
+class UserDefinedFunction(object):
+"""
+Base interface for user-defined function.
+"""
+__metaclass__ = ABCMeta
+
+def open(self, function_context):
+"""
+Initialization method for the function. It is called before the actual 
working methods
+and thus suitable for one time setup work.
+
+:param function_context: the context of the function
+:type function_context: FunctionContext
+"""
+pass
+
+def close(self):
+"""
+Tear-down method for the user code. It is called after the last call 
to the main
+working methods.
+"""
+pass
+
+def is_deterministic(self):
+"""
+Returns information about the determinism of the function's results.
+It returns true if and only if a call to this function is guaranteed to
+always return the same result given the same parameters. true is 
assumed by default.
+If the function is not pure functional like random(), date(), now(),
+this method must return false.
+
+:return: the determinism of the function's results.
+:rtype: bool
+"""
+return True
+
+
+class ScalarFunction(UserDefinedFunction):
+"""
+Base interface for user-defined scalar function. A user-defined scalar 
functions maps zero, one,
+or multiple scalar values to a new scalar value.
+"""
+
+@abstractmethod
+def eval(self, *args):
+"""
+Method which defines the logic of the scalar function.
+"""
+pass
+
+
+class LambdaScalarFunction(ScalarFunction):
+"""
+Helper scalar function implementation for lambda function. It's for 
internal use only.
+"""
+
+def __init__(self, func):
+self.func = func
+
+def eval(self, *args):
+return self.func(*args)
+
+
+class UserDefinedFunctionWrapper(object):
+"""
+Wrapper for Python user-defined function. It handles things like 
converting lambda
+functions to user-defined functions, creating the Java user-defined 
function representation,
+etc.
+"""
+
+def __init__(self, func, input_types, result_type, name=None, 
deterministic=True):
+if inspect.isclass(func) or (
+not isinstance(func, UserDefinedFunction) and not 
callable(func)):
+raise TypeError(
+"Invalid function: not a function or callable (__call__ is not 
defined): "
+"{0}".format(type(func)))
+
+if not isinstance(input_types, collections.Iterable):
+input_types = [input_types]
+
+for input_type in input_types:
+if not isinstance(input_type, DataType):
+raise TypeError(
+"Invalid input_type: input_type should be DataType "
+"but contains {}".format(input_type))
+
+if not isinstance(result_type, DataType):
+raise TypeError(
+"Invalid returnType: returnType should be DataType "
+"but is {}".format(result_type))
+
+self._func = func
+self._input_types 

[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329304847
 
 

 ##
 File path: flink-python/pyflink/table/udf.py
 ##
 @@ -0,0 +1,217 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import functools
+import inspect
+from abc import ABCMeta, abstractmethod
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_type
+from pyflink.util import utils
+
+__all__ = ['FunctionContext', 'ScalarFunction', 'udf']
+
+
+class FunctionContext(object):
+"""
+Used to obtain global runtime information about the context in which the
+user-defined function is executed. The information includes the metric 
group,
+and global job parameters, etc.
+"""
+pass
+
+
+class UserDefinedFunction(object):
+"""
+Base interface for user-defined function.
+"""
+__metaclass__ = ABCMeta
 
 Review comment:
   Can we remove this? The child class implements it through subclassing 
instead of register.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329310696
 
 

 ##
 File path: flink-python/pyflink/table/udf.py
 ##
 @@ -0,0 +1,217 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import functools
+import inspect
+from abc import ABCMeta, abstractmethod
+
+from pyflink.java_gateway import get_gateway
+from pyflink.table.types import DataType, _to_java_type
+from pyflink.util import utils
+
+__all__ = ['FunctionContext', 'ScalarFunction', 'udf']
+
+
+class FunctionContext(object):
+"""
+Used to obtain global runtime information about the context in which the
+user-defined function is executed. The information includes the metric 
group,
+and global job parameters, etc.
+"""
+pass
+
+
+class UserDefinedFunction(object):
+"""
+Base interface for user-defined function.
+"""
+__metaclass__ = ABCMeta
+
+def open(self, function_context):
+"""
+Initialization method for the function. It is called before the actual 
working methods
+and thus suitable for one time setup work.
+
+:param function_context: the context of the function
+:type function_context: FunctionContext
+"""
+pass
+
+def close(self):
+"""
+Tear-down method for the user code. It is called after the last call 
to the main
+working methods.
+"""
+pass
+
+def is_deterministic(self):
+"""
+Returns information about the determinism of the function's results.
+It returns true if and only if a call to this function is guaranteed to
+always return the same result given the same parameters. true is 
assumed by default.
+If the function is not pure functional like random(), date(), now(),
+this method must return false.
+
+:return: the determinism of the function's results.
+:rtype: bool
+"""
+return True
+
+
+class ScalarFunction(UserDefinedFunction):
+"""
+Base interface for user-defined scalar function. A user-defined scalar 
functions maps zero, one,
+or multiple scalar values to a new scalar value.
+"""
+
+@abstractmethod
+def eval(self, *args):
+"""
+Method which defines the logic of the scalar function.
+"""
+pass
+
+
+class LambdaScalarFunction(ScalarFunction):
 
 Review comment:
   This class is used for both lambda and python functions. Maybe we can change 
the name to `DelegatingScalarFunction`? (Also change the comments below)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-28 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r329295245
 
 

 ##
 File path: flink-python/pyflink/table/table_environment.py
 ##
 @@ -542,6 +542,36 @@ def register_java_function(self, name, 
function_class_name):
 .loadClass(function_class_name).newInstance()
 self._j_tenv.registerFunction(name, java_function)
 
+def register_function(self, name, function):
 
 Review comment:
   Previously, in `test_environment_completeness.py`, we ignore this method 
when check the Api alignment between python and Java. We should also check this 
method from now on. 
   
   We can just remove the `registerFunction` in the 
`test_environment_completeness.py`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328670607
 
 

 ##
 File path: flink-python/MANIFEST.in
 ##
 @@ -29,3 +29,4 @@ include README.md
 include pyflink/LICENSE
 include pyflink/NOTICE
 include pyflink/README.txt
+graft pyflink/proto
 
 Review comment:
   It seems we don't need this change. The proto dir has been included in the 
pyflink.zip?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328685073
 
 

 ##
 File path: flink-python/pyflink/fn_execution/operations.py
 ##
 @@ -0,0 +1,261 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from abc import abstractmethod, ABCMeta
+
+from apache_beam.runners.worker import operation_specs
+from apache_beam.runners.worker import bundle_processor
+from apache_beam.runners.worker.operations import Operation
+
+from pyflink.fn_execution import flink_fn_execution_pb2
+
+SCALAR_FUNCTION_URN = "flink:transform:scalar_function:v1"
+
+
+class InputGetter(object):
+"""
+Base class for get an input argument for a :class:`UserDefinedFunction`.
+"""
+__metaclass__ = ABCMeta
+
+def open(self):
+pass
+
+def close(self):
+pass
+
+@abstractmethod
+def get(self, value):
+pass
+
+
+class OffsetInputGetter(InputGetter):
+"""
+InputGetter for the input argument which is a column of the input row.
+
+:param input_offset: the offset of the column in the input row
+"""
+
+def __init__(self, input_offset):
+self.input_offset = input_offset
+
+def get(self, value):
+return value[self.input_offset]
+
+
+class ScalarFunctionInputGetter(InputGetter):
+"""
+InputGetter for the input argument which is a Python 
:class:`ScalarFunction`. This is used for
+chaining Python functions.
+
+:param scalar_function_proto: the proto representation of the Python 
:class:`ScalarFunction`
+"""
+
+def __init__(self, scalar_function_proto):
+self.scalar_function_invoker = 
create_scalar_function_invoker(scalar_function_proto)
+
+def open(self):
+self.scalar_function_invoker.invoke_open()
+
+def close(self):
+self.scalar_function_invoker.invoke_close()
+
+def get(self, value):
+return self.scalar_function_invoker.invoke_eval(value)
+
+
+class ScalarFunctionInvoker(object):
+"""
+An abstraction that can be used to execute :class:`ScalarFunction` methods.
+
+A ScalarFunctionInvoker describes a particular way for invoking methods of 
a
+:class:`ScalarFunction`.
+
+:param scalar_function: the :class:`ScalarFunction` to execute
+:param inputs: the input arguments for the :class:`ScalarFunction`
+"""
+
+def __init__(self, scalar_function, inputs):
+self.scalar_function = scalar_function
+self.input_getters = []
+for input in inputs:
+if input.HasField("udf"):
+# for chaining Python UDF input: the input argument is a 
Python ScalarFunction
+self.input_getters.append(ScalarFunctionInputGetter(input.udf))
+else:
+# the input argument is a column of the input row
+self.input_getters.append(OffsetInputGetter(input.inputOffset))
+
+def invoke_open(self):
+"""
+Invokes the ScalarFunction.open() function.
+"""
+for input_getter in self.input_getters:
+input_getter.open()
+# set the FunctionContext to None for now
+self.scalar_function.open(None)
+
+def invoke_close(self):
+"""
+Invokes the ScalarFunction.close() function.
+"""
+for input_getter in self.input_getters:
+input_getter.close()
+self.scalar_function.close()
+
+def invoke_eval(self, value):
+"""
+Invokes the ScalarFunction.eval() function.
+
+:param value: the input element for which eval() method should be 
invoked
+"""
+args = [input_getter.get(value) for input_getter in self.input_getters]
+return self.scalar_function.eval(*args)
+
+
+def create_scalar_function_invoker(scalar_function_proto):
+"""
+Creates :class:`ScalarFunctionInvoker` from the proto representation of a
+:class:`ScalarFunction`.
+
+  

[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328687911
 
 

 ##
 File path: NOTICE-binary
 ##
 @@ -8035,6 +8035,7 @@ See bundled license files for details
 - com.google.protobuf:protobuf-java:3.7.1
 - com.google.protobuf:protobuf-java-util:3.7.1
 - com.google.auth:google-auth-library-credentials:0.13.0
+- cloudpickle:1.2.2
 
 Review comment:
   As the cloudpicke is part of the source distribution, we also need to add 
cloudpickle in the NOTICE file in the project root directory


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328664561
 
 

 ##
 File path: flink-python/pyflink/gen_protos.py
 ##
 @@ -0,0 +1,143 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from __future__ import absolute_import
+from __future__ import print_function
+
+import glob
+import logging
+import multiprocessing
+import os
+import platform
+import shutil
+import subprocess
+import sys
+import time
+import warnings
+
+import pkg_resources
+
+# latest grpcio-tools incompatible with latest protobuf 3.6.1.
+GRPC_TOOLS = 'grpcio-tools>=1.3.5,<=1.14.2'
+
+PROTO_PATHS = [
+os.path.join('proto'),
+]
+
+PYTHON_OUTPUT_PATH = os.path.join('fn_execution')
+
+
+def generate_proto_files(force=False):
+try:
+import grpc_tools  # noqa  # pylint: disable=unused-import
+except ImportError:
+warnings.warn('Installing grpcio-tools is recommended for 
development.')
+
+py_sdk_root = os.path.dirname(os.path.abspath(__file__))
+proto_dirs = [os.path.join(py_sdk_root, path) for path in PROTO_PATHS]
+proto_files = sum(
+[glob.glob(os.path.join(d, '*.proto')) for d in proto_dirs], [])
+out_dir = os.path.join(py_sdk_root, PYTHON_OUTPUT_PATH)
+out_files = [path for path in glob.glob(os.path.join(out_dir, '*_pb2.py'))]
+
+if out_files and not proto_files and not force:
+# We have out_files but no protos; assume they're up to date.
+# This is actually the common case (e.g. installation from an sdist).
+logging.info('No proto files; using existing generated files.')
+return
+
+elif not out_files and not proto_files:
+raise RuntimeError(
+'No proto files found in %s.' % proto_dirs)
+
+# Regenerate iff the proto files or this file are newer.
+elif force or not out_files or len(out_files) < len(proto_files) or (
+min(os.path.getmtime(path) for path in out_files)
+<= max(os.path.getmtime(path)
+   for path in proto_files + [os.path.realpath(__file__)])):
+try:
+from grpc_tools import protoc
+except ImportError:
+if platform.system() == 'Windows':
+# For Windows, grpcio-tools has to be installed manually.
+raise RuntimeError(
+'Cannot generate protos for Windows since grpcio-tools 
package is '
+'not installed. Please install this package manually '
+'using \'pip install grpcio-tools\'.')
+
+# Use a subprocess to avoid messing with this process' path and 
imports.
+# Note that this requires a separate module from setup.py for 
Windows:
+# https://docs.python.org/2/library/multiprocessing.html#windows
+p = multiprocessing.Process(
+target=_install_grpcio_tools_and_generate_proto_files)
+p.start()
+p.join()
+if p.exitcode:
+raise ValueError("Proto generation failed (see log for 
details).")
+else:
+logging.info('Regenerating out-of-date Python proto definitions.')
+builtin_protos = pkg_resources.resource_filename('grpc_tools', 
'_proto')
+args = (
+[sys.executable] +  # expecting to be called from command line
+['--proto_path=%s' % builtin_protos] +
+['--proto_path=%s' % d for d in proto_dirs] +
+['--python_out=%s' % out_dir] +
+proto_files)
+ret_code = protoc.main(args)
+if ret_code:
+raise RuntimeError(
+'Protoc returned non-zero status (see logs for details): '
+'%s' % ret_code)
+
+
+# Though wheels are available for grpcio-tools, setup_requires uses
+# easy_install which doesn't understand them.  This means that it 

[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328537992
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/python/AbstractPythonFunctionRunner.java
 ##
 @@ -280,13 +281,16 @@ private static String randomString(Random random) {
 */
protected RunnerApi.Environment createPythonExecutionEnvironment() {
if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
-   final Map env = new HashMap<>(2);
-   env.put("python", pythonEnv.getPythonExec());
+   final Map env = new HashMap<>(1);
 
 Review comment:
   We don't need this env. Just pass null to the 
`Environments.createProcessEnvironment()` method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328514073
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCalc.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.functions.python.{PythonFunction, 
PythonFunctionInfo, SimplePythonFunction}
+import org.apache.flink.table.functions.utils.ScalarSqlFunction
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+trait CommonPythonCalc {
+
+  private[flink] def extractPythonScalarFunctionInfos(
+  rexCalls: Array[RexCall]): (Array[Int], Array[PythonFunctionInfo]) = {
+// using LinkedHashMap to keep the insert order
+val inputNodes = new mutable.LinkedHashMap[RexNode, Integer]()
+val pythonFunctionInfos = rexCalls.map(createPythonScalarFunctionInfo(_, 
inputNodes))
+
+val udfInputOffsets = inputNodes.toArray.sortBy(_._2).map(_._1).map {
 
 Review comment:
   We don't need sort here as the list will make sure the order.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328533791
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/CommonPythonCalc.scala
 ##
 @@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+import org.apache.flink.table.functions.FunctionLanguage
+import org.apache.flink.table.functions.python.{PythonFunction, 
PythonFunctionInfo, SimplePythonFunction}
+import org.apache.flink.table.functions.utils.ScalarSqlFunction
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+
+trait CommonPythonCalc {
+
+  private[flink] def extractPythonScalarFunctionInfos(
+  rexCalls: Array[RexCall]): (Array[Int], Array[PythonFunctionInfo]) = {
+// using LinkedHashMap to keep the insert order
+val inputNodes = new mutable.LinkedHashMap[RexNode, Integer]()
+val pythonFunctionInfos = rexCalls.map(createPythonScalarFunctionInfo(_, 
inputNodes))
+
+val udfInputOffsets = inputNodes.toArray.sortBy(_._2).map(_._1).map {
+  case inputRef: RexInputRef => inputRef.getIndex
+}
+(udfInputOffsets, pythonFunctionInfos)
+  }
+
+  private[flink] def createPythonScalarFunctionInfo(
+  rexCall: RexCall,
+  inputNodes: mutable.Map[RexNode, Integer]): PythonFunctionInfo = 
rexCall.getOperator match {
+case sfc: ScalarSqlFunction if sfc.getScalarFunction.getLanguage == 
FunctionLanguage.PYTHON =>
+  val inputs = new mutable.ArrayBuffer[AnyRef]()
+  rexCall.getOperands.foreach {
+case pythonRexCall: RexCall if 
pythonRexCall.getOperator.asInstanceOf[ScalarSqlFunction]
+  .getScalarFunction.getLanguage == FunctionLanguage.PYTHON =>
+  // Continuous Python UDFs can be chained together
+  val argPythonInfo = createPythonScalarFunctionInfo(pythonRexCall, 
inputNodes)
+  inputs.append(argPythonInfo)
+
+case argNode: RexNode =>
 
 Review comment:
   Should we add exceptions and meaningful exception messages here if the 
RexNode is a RexLiteral? We don't support UDFs with literals now.
   
   For example, we can do the check and throw the exception in the 
DataStreamPythonCalcRule, thus we can check it through the plan test. Or can we 
throw the exception earlier?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-26 Thread GitBox
hequn8128 commented on a change in pull request #9766: [FLINK-14018][python] 
Add Python building blocks to make sure the basic functionality of Python 
ScalarFunction could work
URL: https://github.com/apache/flink/pull/9766#discussion_r328471344
 
 

 ##
 File path: flink-python/bin/pyflink-udf-runner.sh
 ##
 @@ -32,16 +32,35 @@ if [[ "$python" = "" ]]; then
 python="python"
 fi
 
-# Add pyflink & py4j to PYTHONPATH
-PYFLINK_ZIP="$FLINK_OPT_DIR/python/pyflink.zip"
-if [[ ! ${PYTHONPATH} =~ ${PYFLINK_ZIP} ]]; then
+CURRENT_DIR=`pwd -P`
 
 Review comment:
   This script is running on worker. Add some comments that why we add the code 
here. I spend some time to figure out that it is added for debug friendly. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services