Qiang Wang created SPARK-28927:
----------------------------------

             Summary: ArrayIndexOutOfBoundsException and Not-stable AUC metrics 
in ALS for datasets with 12 billion instances
                 Key: SPARK-28927
                 URL: https://issues.apache.org/jira/browse/SPARK-28927
             Project: Spark
          Issue Type: Bug
          Components: ML
    Affects Versions: 2.2.1
            Reporter: Qiang Wang


The stack trace is below:
{quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for task 
325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
java.lang.ArrayIndexOutOfBoundsException: 6741 at 
org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
 at 
org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
 at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
 at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
 at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 at scala.collection.immutable.List.foreach(List.scala:381) at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
org.apache.spark.scheduler.Task.run(Task.scala:108) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)
{quote}
This exception happened sometimes.  And we also found that the AUC metric was 
not stable when evaluating the inner product of the user factors and the item 
factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
which was not stable for production environment. 

Dataset capacity: ~12 billion ratings
Here is the our code:
val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, 
y._2.toFloat)))
  .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class 
ALSData(user:Int, item:Int, rating:Float) extends Serializable
val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
    val als = new ALS
    val paramMap = ParamMap(als.alpha -> 25000).
      put(als.checkpointInterval, 5).
      put(als.implicitPrefs, true).
      put(als.itemCol, "item").
      put(als.maxIter, 60).
      put(als.nonnegative, false).
      put(als.numItemBlocks, 600).
      put(als.numUserBlocks, 600).
      put(als.regParam, 4.5).
      put(als.rank, 25).
      put(als.userCol, "user")
    als.fit(ratingData, paramMap)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to