Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r150696878 --- Diff: python/pyspark/sql/functions.py --- @@ -2049,132 +2051,13 @@ def map_values(col): # ---------------------------- User Defined Function ---------------------------------- -def _wrap_function(sc, func, returnType): - command = (func, returnType) - pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) - return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, - sc.pythonVer, broadcast_vars, sc._javaAccumulator) - - -class PythonUdfType(object): - # row-at-a-time UDFs - NORMAL_UDF = 0 - # scalar vectorized UDFs - PANDAS_UDF = 1 - # grouped vectorized UDFs - PANDAS_GROUPED_UDF = 2 - - -class UserDefinedFunction(object): - """ - User defined function in Python - - .. versionadded:: 1.3 - """ - def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF): - if not callable(func): - raise TypeError( - "Not a function or callable (__call__ is not defined): " - "{0}".format(type(func))) - - self.func = func - self._returnType = returnType - # Stores UserDefinedPythonFunctions jobj, once initialized - self._returnType_placeholder = None - self._judf_placeholder = None - self._name = name or ( - func.__name__ if hasattr(func, '__name__') - else func.__class__.__name__) - self.pythonUdfType = pythonUdfType - - @property - def returnType(self): - # This makes sure this is called after SparkContext is initialized. - # ``_parse_datatype_string`` accesses to JVM for parsing a DDL formatted string. - if self._returnType_placeholder is None: - if isinstance(self._returnType, DataType): - self._returnType_placeholder = self._returnType - else: - self._returnType_placeholder = _parse_datatype_string(self._returnType) - return self._returnType_placeholder - - @property - def _judf(self): - # It is possible that concurrent access, to newly created UDF, - # will initialize multiple UserDefinedPythonFunctions. - # This is unlikely, doesn't affect correctness, - # and should have a minimal performance impact. - if self._judf_placeholder is None: - self._judf_placeholder = self._create_judf() - return self._judf_placeholder - - def _create_judf(self): - from pyspark.sql import SparkSession - - spark = SparkSession.builder.getOrCreate() - sc = spark.sparkContext - - wrapped_func = _wrap_function(sc, self.func, self.returnType) - jdt = spark._jsparkSession.parseDataType(self.returnType.json()) - judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction( - self._name, wrapped_func, jdt, self.pythonUdfType) - return judf - - def __call__(self, *cols): - judf = self._judf - sc = SparkContext._active_spark_context - return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) - - def _wrapped(self): - """ - Wrap this udf with a function and attach docstring from func - """ - - # It is possible for a callable instance without __name__ attribute or/and - # __module__ attribute to be wrapped here. For example, functools.partial. In this case, - # we should avoid wrapping the attributes from the wrapped function to the wrapper - # function. So, we take out these attribute names from the default names to set and - # then manually assign it after being wrapped. - assignments = tuple( - a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__') - - @functools.wraps(self.func, assigned=assignments) - def wrapper(*args): - return self(*args) - - wrapper.__name__ = self._name - wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__') - else self.func.__class__.__module__) - - wrapper.func = self.func - wrapper.returnType = self.returnType - wrapper.pythonUdfType = self.pythonUdfType - - return wrapper - - -def _create_udf(f, returnType, pythonUdfType): - - def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType): - if pythonUdfType == PythonUdfType.PANDAS_UDF: - import inspect - argspec = inspect.getargspec(f) - if len(argspec.args) == 0 and argspec.varargs is None: - raise ValueError( - "0-arg pandas_udfs are not supported. " - "Instead, create a 1-arg pandas_udf and ignore the arg in your function." - ) - udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType) - return udf_obj._wrapped() - - # decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf - if f is None or isinstance(f, (str, DataType)): - # If DataType has been passed as a positional argument - # for decorator use it as a returnType - return_type = f or returnType - return functools.partial(_udf, returnType=return_type, pythonUdfType=pythonUdfType) - else: - return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType) +class PandasUDFType(enum.Enum): --- End diff -- I can't currently take a closer look right now but please let me know @icexelloss if I missed your intention. Let me take a closer look tonight.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org