There are several suggestions on this SOF
https://stackoverflow.com/questions/38984775/spark-errorexpected-zero-arguments-for-construction-of-classdict-for-numpy-cor

1

You need to convert the final value to a python list. You implement the
function as follows:

def uniq_array(col_array):
    x = np.unique(col_array)
    return list(x)

This is because Spark doesn't understand the numpy array format. In order
to feed a python object that Spark DataFrames understand as an ArrayType,
you need to convert the output to a python list before returning it.




The source of the problem is that object returned from the UDF doesn't
conform to the declared type. np.unique not only returns numpy.ndarray but
also converts numerics to the corresponding NumPy types which are not
compatible <https://issues.apache.org/jira/browse/SPARK-12157> with
DataFrame API. You can try something like this:

udf(lambda x: list(set(x)), ArrayType(IntegerType()))

or this (to keep order)

udf(lambda xs: list(OrderedDict((x, None) for x in xs)),
    ArrayType(IntegerType()))

instead.

If you really want np.unique you have to convert the output:

udf(lambda x: np.unique(x).tolist(), ArrayType(IntegerType()))













Am Mo., 8. Apr. 2019 um 11:43 Uhr schrieb Sudhir Babu Pothineni <
sbpothin...@gmail.com>:

>
>
>
> Trying to run tests in spark-sklearn, anybody check the below exception
>
> pip freeze:
>
> nose==1.3.7
> numpy==1.16.1
> pandas==0.19.2
> python-dateutil==2.7.5
> pytz==2018.9
> scikit-learn==0.19.2
> scipy==1.2.0
> six==1.12.0
> spark-sklearn==0.3.0
>
> Spark version:
> spark-2.2.3-bin-hadoop2.6/bin/pyspark
>
>
> running into following exception:
>
> ======================================================================
> ERROR: test_scipy_sparse (spark_sklearn.converter_test.CSRVectorUDTTests)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File
> "/home/spothineni/Downloads/spark-sklearn-release-0.3.0/python/spark_sklearn/converter_test.py",
> line 83, in test_scipy_sparse
>     self.assertEqual(df.count(), 1)
>   File
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py",
> line 522, in count
>     return int(self._jdf.count())
>   File
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
> line 1257, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/pyspark/sql/utils.py",
> line 63, in deco
>     return f(*a, **kw)
>   File
> "/home/spothineni/Downloads/spark-2.4.1-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
> line 328, in get_return_value
>     format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o652.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in
> stage 0.0 (TID 11, localhost, executor driver):
> net.razorvine.pickle.PickleException: expected zero arguments for
> construction of ClassDict (for numpy.dtype)
> at
> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
> at
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
> at
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
> at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2830)
> at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2829)
> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: net.razorvine.pickle.PickleException: expected zero arguments
> for construction of ClassDict (for numpy.dtype)
> at
> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
> at
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
> at
> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:403)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:409)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ... 1 more
>
>
>

Reply via email to