[ https://issues.apache.org/jira/browse/SPARK-23645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stu (Michael Stewart) updated SPARK-23645: ------------------------------------------ Description: pandas_udf (all python udfs(?)) do not accept keyword arguments because `pyspark/sql/udf.py` class `UserDefinedFunction` has __call__, and also wrapper utility methods, that only accept args and not kwargs: @ line 168: {code:java} ... def __call__(self, *cols): judf = self._judf sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) # This function is for improving the online help system in the interactive interpreter. # For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and # argument annotation. (See: SPARK-19161) def _wrapped(self): """ Wrap this udf with a function and attach docstring from func """ # It is possible for a callable instance without __name__ attribute or/and # __module__ attribute to be wrapped here. For example, functools.partial. In this case, # we should avoid wrapping the attributes from the wrapped function to the wrapper # function. So, we take out these attribute names from the default names to set and # then manually assign it after being wrapped. assignments = tuple( a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__') @functools.wraps(self.func, assigned=assignments) def wrapper(*args): return self(*args) ...{code} as seen in: {code:java} from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit spark = SparkSession.builder.getOrCreate() df = spark.range(12).withColumn('b', col('id') * 2) def ok(a,b): return a*b df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show() # no problems df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show() # fail with ~no stacktrace thanks to wrapper helper{code} *discourse*: it isn't difficult to swap back in the kwargs, allowing the UDF to be called as such, but the cols tuple that gets passed in the call method: {code:java} _to_seq(sc, cols, _to_java_column{code} has to be in the right order based on the functions defined argument inputs, or the function will return incorrect results. so, the challenge here is to: (a) make sure to reconstruct the proper order of the full args/kwargs --> args first, and then kwargs (not in the order passed but in the order requested by the fn) (b) handle python2 and python3 `inspect` module inconsistencies was: pandas_udf (all python udfs(?)) do not accept keyword arguments because `pyspark/sql/udf.py` class `UserDefinedFunction` has __call__, and also wrapper utility methods, that only accept args and not kwargs: @ line 168: {code:java} ... def __call__(self, *cols): judf = self._judf sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) # This function is for improving the online help system in the interactive interpreter. # For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and # argument annotation. (See: SPARK-19161) def _wrapped(self): """ Wrap this udf with a function and attach docstring from func """ # It is possible for a callable instance without __name__ attribute or/and # __module__ attribute to be wrapped here. For example, functools.partial. In this case, # we should avoid wrapping the attributes from the wrapped function to the wrapper # function. So, we take out these attribute names from the default names to set and # then manually assign it after being wrapped. assignments = tuple( a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__') @functools.wraps(self.func, assigned=assignments) def wrapper(*args): return self(*args) ...{code} as seen in: {code:java} from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit spark = SparkSession.builder.getOrCreate() df = spark.range(12).withColumn('b', col('id') * 2) def ok(a,b): return a*b df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show() # no problems df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')(a='id',b='b')).show() # fail with ~no stacktrace thanks to wrapper helper{code} discourse: it isn't difficult to swap back in the kwargs, allowing the UDF to be called as such, but the cols tuple that gets passed in the call method: {code:java} _to_seq(sc, cols, _to_java_column{code} has to be in the right order based on the functions defined argument inputs, or the function will return incorrect results. > pandas_udf can not be called with keyword arguments > --------------------------------------------------- > > Key: SPARK-23645 > URL: https://issues.apache.org/jira/browse/SPARK-23645 > Project: Spark > Issue Type: Improvement > Components: PySpark > Affects Versions: 2.3.0 > Environment: python 3.6 | pyspark 2.3.0 | Using Scala version 2.11.8, > OpenJDK 64-Bit Server VM, 1.8.0_141 > Reporter: Stu (Michael Stewart) > Priority: Minor > > pandas_udf (all python udfs(?)) do not accept keyword arguments because > `pyspark/sql/udf.py` class `UserDefinedFunction` has __call__, and also > wrapper utility methods, that only accept args and not kwargs: > @ line 168: > {code:java} > ... > def __call__(self, *cols): > judf = self._judf > sc = SparkContext._active_spark_context > return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) > # This function is for improving the online help system in the interactive > interpreter. > # For example, the built-in help / pydoc.help. It wraps the UDF with the > docstring and > # argument annotation. (See: SPARK-19161) > def _wrapped(self): > """ > Wrap this udf with a function and attach docstring from func > """ > # It is possible for a callable instance without __name__ attribute or/and > # __module__ attribute to be wrapped here. For example, > functools.partial. In this case, > # we should avoid wrapping the attributes from the wrapped function to > the wrapper > # function. So, we take out these attribute names from the default names > to set and > # then manually assign it after being wrapped. > assignments = tuple( > a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != > '__module__') > @functools.wraps(self.func, assigned=assignments) > def wrapper(*args): > return self(*args) > ...{code} > as seen in: > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit > spark = SparkSession.builder.getOrCreate() > df = spark.range(12).withColumn('b', col('id') * 2) > def ok(a,b): return a*b > df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id','b')).show() > # no problems > df.withColumn('ok', pandas_udf(f=ok, > returnType='bigint')(a='id',b='b')).show() # fail with ~no stacktrace thanks > to wrapper helper{code} > > > *discourse*: it isn't difficult to swap back in the kwargs, allowing the UDF > to be called as such, but the cols tuple that gets passed in the call method: > {code:java} > _to_seq(sc, cols, _to_java_column{code} > has to be in the right order based on the functions defined argument inputs, > or the function will return incorrect results. so, the challenge here is to: > (a) make sure to reconstruct the proper order of the full args/kwargs > --> args first, and then kwargs (not in the order passed but in the order > requested by the fn) > (b) handle python2 and python3 `inspect` module inconsistencies -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org