I nailed it down to a union operation, here's my code snippet:
val properties: RDD[((String, String, String), Externalizer[KeyValue])]
= vertices.map { ve =>
val (vertices, dsName) = ve
val rval = GraphConfig.getRval(datasetConf, Constants.VERTICES,
dsName)
val (_, rvalAsc, rvalType) = rval
println(s"Table name: $dsName, Rval: $rval")
println(vertices.toDebugString)
vertices.map { v =>
val rk = appendHash(boxId(v.id)).getBytes
val cf = PROP_BYTES
val cq = boxRval(v.rval, rvalAsc, rvalType).getBytes
val value = Serializer.serialize(v.properties)
((new String(rk), new String(cf), new String(cq)),
Externalizer(put(rk, cf, cq, value)))
}
}.reduce(_.union(_)).sortByKey(numPartitions = 32)
Basically I read data from multiple tables (Seq[RDD[(key, value)]]) and
they're transformed to the a KeyValue to be insert in HBase, so I need to
do a .reduce(_.union(_)) to combine them into one RDD[(key, value)].
I cannot see what's wrong in my code.
Jianshi
On Fri, Jul 25, 2014 at 12:24 PM, Jianshi Huang <[email protected]>
wrote:
> I can successfully run my code in local mode using spark-submit (--master
> local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode.
>
> Any hints what is the problem? Is it a closure serialization problem? How
> can I debug it? Your answers would be very helpful.
>
> 14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.ExceptionInInitializerError
> java.lang.ExceptionInInitializerError
> at
> com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
> a:40)
> at
> com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal
> a:36)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1080)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
--
Jianshi Huang
LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/