[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920596#comment-16920596
 ] 

Qiang Wang commented on SPARK-28927:
------------------------------------

I only tested it on version 2.2.1 which is compatible to our spark version.  Is 
it ok to use mlib of the master brach while keeping the spark in the version 
2.2.1?

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-28927
>                 URL: https://issues.apache.org/jira/browse/SPARK-28927
>             Project: Spark
>          Issue Type: Bug
>          Components: ML
>    Affects Versions: 2.2.1
>            Reporter: Qiang Wang
>            Priority: Major
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
> Here is the our code:
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, 
> y._2.toFloat)))
>   .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class 
> ALSData(user:Int, item:Int, rating:Float) extends Serializable
> val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
>     val als = new ALS
>     val paramMap = ParamMap(als.alpha -> 25000).
>       put(als.checkpointInterval, 5).
>       put(als.implicitPrefs, true).
>       put(als.itemCol, "item").
>       put(als.maxIter, 60).
>       put(als.nonnegative, false).
>       put(als.numItemBlocks, 600).
>       put(als.numUserBlocks, 600).
>       put(als.regParam, 4.5).
>       put(als.rank, 25).
>       put(als.userCol, "user")
>     als.fit(ratingData, paramMap)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to