[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-28927: ------------------------------ Issue Type: Improvement (was: Bug) Priority: Minor (was: Major) Summary: Improve error for ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances (was: ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances) > Improve error for ArrayIndexOutOfBoundsException and Not-stable AUC metrics > in ALS for datasets with 12 billion instances > ------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Improvement > Components: ML > Affects Versions: 2.2.1 > Reporter: Qiang Wang > Assignee: Liang-Chi Hsieh > Priority: Minor > Attachments: image-2019-09-02-11-55-33-596.png > > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > {code:java} > val hivedata = sc.sql(sqltext).select("id", "dpid", "score", "tag") > .repartition(6000).persist(StorageLevel.MEMORY_AND_DISK_SER) > val zeroValueArrItem = ArrayBuffer[(String, Int)]() > val predataItem = hivedata. > map(r => (r.getString(0), (r.getString(1), r.getInt(2)))).rdd. > aggregateByKey(zeroValueArrItem, 6000)((a, b) => a += b, (a, b) => a ++ b). > zipWithIndex(). > setName(predataItemName). > persist(StorageLevel.MEMORY_AND_DISK_SER) > val zeroValueArr = ArrayBuffer[(Int, Int)]() > val predataUser = predataItem. > flatMap(r => r._1._2.map(y => (y._1, (r._2.toInt, y._2)))). > aggregateByKey(zeroValueArr, 6000)((a, b) => a += b, (a, b) => a ++ b). > > zipWithIndex().setName(predataUserName).persist(StorageLevel.MEMORY_AND_DISK_SER) > val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, > y._2.toFloat))) > .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class > ALSData(user:Int, item:Int, rating:Float) extends Serializable > val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF() > val als = new ALS > val paramMap = ParamMap(als.alpha -> 25000). > put(als.checkpointInterval, 5). > put(als.implicitPrefs, true). > put(als.itemCol, "item"). > put(als.maxIter, 60). > put(als.nonnegative, false). > put(als.numItemBlocks, 600). > put(als.numUserBlocks, 600). > put(als.regParam, 4.5). > put(als.rank, 25). > put(als.userCol, "user") > als.fit(ratingData, paramMap){code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org