Hi all,
I am trying to do wurfl lookup in a spark cluster and getting exceptions, I am
pretty sure that the same thing works in small scale. But it fails when I tried
to do it in spark. I used spark-ec2/copy-dir to copy the wurfl library to
workers already and launched the spark-shell with parameter —jars including
wurfl and its dependencies in the lib/ directory.
To reconstruct the error, let’s say that I have a userAgentRdd already, which
is RDD[String] and a userAgentSample of Array[String]. I am trying to reuse the
wurfl engine by doing mapPartitions so I can save time for reloading it.
import net.sourceforge.wurfl.core.GeneralWURFLEngine
def lookupModel(wurfl: GeneralWURFLEngine)(userAgent: String) = {
val device = wurfl.getDeviceForRequest(userAgent)
val brand = device.getCapability("brand_name")
val model = device.getCapability("model_name")
(brand, model)
}
def lookupModelPartitions(wurflXmlPath: String)(userAgentIterator:
Iterator[String]) = {
val wurfl = new GeneralWURFLEngine(wurflXmlPath)
wurfl.setEngineTarget(EngineTarget.accuracy)
userAgentIterator.map(lookupModel(wurfl))
}
// the following will work
val wurflEngine = new
GeneralWURFLEngine("/root/wurfl-1.6.1.0-release/wurfl.zip")
val userAgentSample = // my local dataset
val modelSample = userAgentSample.map(lookupModel(wurflEngine))
// the following will also work
val userAgentRdd = // my spark dataset
val modelRdd =
userAgentRdd.mapPartitions(lookupModelPartitions("/root/wurfl-1.6.1.0-release/wurfl.zip”))
modelRdd.take(10)
// but the following will not work
modelRdd.count
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID
491, 10.128.224.227): net.sourceforge.wurfl.core.exc.WURFLRuntimeException:
WURFL unexpected exception
at
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:286)
at
net.sourceforge.wurfl.core.GeneralWURFLEngine.getDeviceForRequest(GeneralWURFLEngine.java:425)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.lookupModel(<console>:23)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(<console>:27)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(<console>:27)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1628)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLResourceException:
WURFL unexpected exception
at
net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:350)
at
net.sourceforge.wurfl.core.resource.XMLResource.getData(XMLResource.java:154)
at
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:118)
at
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.<init>(DefaultWURFLModel.java:110)
at
net.sourceforge.wurfl.core.GeneralWURFLEngine.init(GeneralWURFLEngine.java:304)
at
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:283)
... 16 more
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLParsingException: The
devices with id generic define more is_wireless_device
at
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startCapabilityElement(XMLResource.java:680)
at
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startElement(XMLResource.java:534)
at
com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.startElement(AbstractSAXParser.java:509)
at
com.sun.org.apache.xerces.internal.parsers.AbstractXMLDocumentParser.emptyElement(AbstractXMLDocumentParser.java:182)
at
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanStartElement(XMLDocumentFragmentScannerImpl.java:1343)
at
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786)
at
com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
at
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:510)
at
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:848)
at
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:777)
at
com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
at
com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.parse(AbstractSAXParser.java:1213)
at
com.sun.org.apache.xerces.internal.jaxp.SAXParserImpl$JAXPSAXParser.parse(SAXParserImpl.java:648)
at
com.sun.org.apache.xerces.internal.jaxp.SAXParserImpl.parse(SAXParserImpl.java:332)
at javax.xml.parsers.SAXParser.parse(SAXParser.java:195)
at
net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:348)
... 21 more
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Please let me know if you have experience.
Thanks,
Zhongxiao