I am still struggling with getting fit() to work on my dataset. The Spark ML exception that is the issue is:
LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? Comparing my standardized Weight values with the tutorial's values. I see I have some negative values. The tutorial values are all positive. The above exception message mentions non positive value, so it's probably my issue. The calculation for standardizing my Weight values Weight - Weight_Mean / Weight_StdDev is producing negative values when the Weight which can between 1 - 72000 is small. I have a suggestion to try using MinMaxScaler. But, it operates on a Vector and I have a single value. Not sure, I see how I make this work. My stats is very old. Is there a way to achieve positive values only when standardizing something like my Weight values above? Thanks. -S From: Steve Pruitt <bpru...@opentext.com> Sent: Monday, April 01, 2019 12:39 PM To: user <user@spark.apache.org> Subject: [EXTERNAL] - [Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner] After following a tutorial on Recommender systems using Pyspark / Spark ML. I decided to jump in with my own dataset. I am specifically trying to predict video suggestions based on an implicit feature for the time a video was watched. I wrote a generator to produce my dataset. I have a total of five videos each 1200 seconds in length. I randomly selected which videos a user watched and a random time between 0-1200. I generated 10k records. Weight is the time watched feature. It looks a like this. UserId,VideoId,Weight 0,1,645 0,2,870 0,3,1075 0,4,486 0,5,900 1,1,353 1,2,988 1,3,152 1,4,953 1,5,641 2,3,12 2,4,444 2,5,87 3,2,658 3,4,270 3,5,530 4,2,722 4,3,255 : After reading the dataset. I convert all columns to Integer in place. Describing Weight produces: summary Weight 0 count 30136 1 mean 597.717945314574 2 stddev 346.475684454489 3 min 0 4 max 1200 Next, I standardized the weight column by: df = dataset.select(mean('Weight').alias('mean_weight'), stddev('Weight').alias('stddev_weight')).crossJoin(dataset).withColumn('weight_scaled', (col('Weight') - col('mean_weight')) / col('stddev_weight')) df.toPandas().head() shows: mean_weight stddev_weight UserId VideoId Weight weight_scaled 0 597.717945 346.475684 0 1 645 0.136466 1 597.717945 346.475684 0 2 870 0.785862 2 597.717945 346.475684 0 3 1075 1.377534 3 597.717945 346.475684 0 4 486 -0.322441 4 597.717945 346.475684 0 5 900 0.872448 : 10 597.717945 346.475684 2 3 12 -1.690502 11 597.717945 346.475684 2 4 444 -0.443662 12 597.717945 346.475684 2 5 87 -1.474037 : After splitting df 80 / 20 to get training / testing I defined the ALS algo with: als = ALS(maxIter=10, regParam=0.1, userCol='UserId', itemCol='VideoId', implicitPrefs=True, ratingCol='weight_scaled', coldStartStrategy='drop') and then model = als.fit(trainingData) Calling fit() is where I get the following error, I don't understand. Py4JJavaError Traceback (most recent call last) <ipython-input-12-20463d9db24b> in <module> ----> 1 model = als.fit(trainingData) C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params) 130 return self.copy(params)._fit(dataset) 131 else: --> 132 return self._fit(dataset) 133 else: 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, " C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset) 286 287 def _fit(self, dataset): --> 288 java_model = self._fit_java(dataset) 289 model = self._create_model(java_model) 290 return self._copyValues(model) C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset) 283 """ 284 self._transfer_params_to_java() --> 285 return self._java_obj.fit(dataset._jdf) 286 287 def _fit(self, dataset): C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args: C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 raise Py4JJavaError( 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else: 322 raise Py4JError( Py4JJavaError: An error occurred while calling o211.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 61.0 failed 1 times, most recent failure: Lost task 5.0 in stage 61.0 (TID 179, localhost, executor driver): org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111) at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1711) at org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1652) at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:972) at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:969) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:969) at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Thanks in advance. -S