Hi Muhammad,

Maybe next time you can use http://pastebin.com/ to format and paste
the cleaner scala code snippet so other can help you easier. Also,
please only paste the significant portion of stack-trace which causes
the issue instead of giant logs.

First of all, In your log, it seems that you run out of memory, and I
guess the problem is you are trying to cache the whole
`clickstreamRDD`. Since you are not necessary using it so many time,
you may not need to cache it for better performance. Or at least, you
storage persistence should be `disk and memory` to avoid out of
memory.

Secondly, `groupByKey` is very expensive here. It's probably not the
root cause why the job is not finished, but `groupByKey` will shuffle
all the data to the reducer. In your case, you can do filter first
which will be executed in parallel in mapper side, and then do
`groupByKey`. You can specify higher num of task when you do
`groupByKey`. I'll recommend you to find a way to write your logic
using `reduceByKey` or `combineByKey` to yield much better performance
since those two operations can reduce or combine the data in mapper
side which will lead to much less shuffle traffic.

Finally, you may want to break down which part of your code causes the
issue to make debugging easier.

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Thu, Dec 11, 2014 at 4:48 AM, Muhammad Ahsan
<muhammad.ah...@gmail.com> wrote:
> ----------------------------------------------
> Code
> ----------------------------------------------
> scala> import org.apache.spark.SparkContext._
> import org.apache.spark.SparkContext._
>
> scala> import org.apache.spark.rdd.RDD
> import org.apache.spark.rdd.RDD
>
> scala> import org.apache.spark.sql.SchemaRDD
> import org.apache.spark.sql.SchemaRDD
>
> scala> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.hive.HiveContext
>
> scala> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.{SparkConf, SparkContext}
>
> scala> val hiveContext: HiveContext = new HiveContext(sc)
> hiveContext: org.apache.spark.sql.hive.HiveContext =
> org.apache.spark.sql.hive.HiveContext@2de76244
>
> scala> val numDays = 2
> numDays: Int = 2
>
> scala> case class Click(
> /* about 20 fields of type STRING */
> )
> defined class Click
>
> scala> val inputRDD = new Array[SchemaRDD](numDays)
> inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null)
>
> scala> for (i <- 1 to numDays) {
>      |             if (i < 10) {
>      |                 inputRDD(i - 1) =
> hiveContext.parquetFile("hdfs://................" + i)
>      |             } else {
>      |                 inputRDD(i - 1) =
> hiveContext.parquetFile("hdfs://................" + i)
>      |             }
>      |
>      |         }
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> scala>         var unionRDD = inputRDD(1)
> unionRDD: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[1] at RDD at SchemaRDD.scala:104
>
> scala>         for (i <- 1 to inputRDD.length - 1) {
>      |             unionRDD = unionRDD.unionAll(inputRDD(i))
>      |         }
>
> scala> val inputRDD = unionRDD
> inputRDD: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[2] at RDD at SchemaRDD.scala:104
> scala>
>
> scala> inputRDD.registerTempTable("urlInfo")
>
> scala>         val clickstreamRDD = hiveContext.sql("select * from urlInfo "
> +
>      |             "where guid regexp '^[0-9a-f-]{36}$' " +
>      |             "AND ((callerid  > 3 AND callerid <10000) OR callerid >
> 100000 " +
>      |             "OR (callerid    =3 AND browsertype = 'IE')) " +
>      |             "AND countrycode regexp '^[A-Z]{2}$'")
> clickstreamRDD: org.apache.spark.sql.SchemaRDD =
> SchemaRDD[3] at RDD at SchemaRDD.scala:104
> scala>
>
> scala>         clickstreamRDD.registerTempTable("clickstream")
>
> scala>         clickstreamRDD.cache()
> res4: clickstreamRDD.type =
> SchemaRDD[3] at RDD at SchemaRDD.scala:104
>
> scala>     val guidClickRDD = clickstreamRDD.map(row =>
> (row(7).asInstanceOf[String], {
>      |             val value = Click(row(0).asInstanceOf[String],
>      |                 row(1).asInstanceOf[String],
> row(2).asInstanceOf[String],
>      |                 row(3).asInstanceOf[String],
> row(4).asInstanceOf[String],
>      |                 row(5).asInstanceOf[String],
> row(6).asInstanceOf[String],
>      |                 row(7).asInstanceOf[String],
> row(8).asInstanceOf[String],
>      |                 row(9).asInstanceOf[String],
> row(10).asInstanceOf[String],
>      |                 row(11).asInstanceOf[String],
> row(12).asInstanceOf[String],
>      |                 row(13).asInstanceOf[String],
> row(14).asInstanceOf[String],
>      |                 row(15).asInstanceOf[String],
> row(16).asInstanceOf[String],
>      |                 row(17).asInstanceOf[String],
> row(18).asInstanceOf[String],
>      |                 row(19).asInstanceOf[String])
>      |             value
>      |         }))
> guidClickRDD: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[14] at
> map at <console>:25
>
> scala> val blackList: RDD[(String, Click)] =
> guidClickRDD.groupByKey().filter(row => row._2.size == 1).map(row =>
>      |             (row._1.asInstanceOf[String], Click("", "", "", "", "",
> "", "", "", "", "", "", "", "", "", "", "", "", "", "", "")))
> blackList: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[27] at map
> at <console>:27
>
> scala>         val guidClickFRDD = guidClickRDD.subtractByKey(blackList)
> guidClickFRDD: org.apache.spark.rdd.RDD[(String, Click)] = SubtractedRDD[28]
> at subtractByKey at <console>:29
>
> scala> guidClickFRDD.reduceByKey((x, y) => {
>      |             /* commutative and associative function */
>      |             Click("US", "US", "US", "US", "US", "US", "US", "US",
> "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US")
>      | }).take(200).foreach(println)
>
> ----------------------------------------------
> EXPECTED OUTPUT
> ----------------------------------------------
>
> (Key_A,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
> (Key_B,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
> (Key_C,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
> (Key_D,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-YarnClientClusterScheduler-Lost-executor-Akka-client-disassociated-tp20625p20626.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to