Hi all,

I am trying to do wurfl lookup in a spark cluster and getting exceptions, I am 
pretty sure that the same thing works in small scale. But it fails when I tried 
to do it in spark. I used spark-ec2/copy-dir to copy the wurfl library to 
workers already and launched the spark-shell with parameter —jars including 
wurfl and its dependencies in the lib/ directory.

To reconstruct the error, let’s say that I have a userAgentRdd already, which 
is RDD[String] and a userAgentSample of Array[String]. I am trying to reuse the 
wurfl engine by doing mapPartitions so I can save time for reloading it.

import net.sourceforge.wurfl.core.GeneralWURFLEngine

def lookupModel(wurfl: GeneralWURFLEngine)(userAgent: String) = {
  val device = wurfl.getDeviceForRequest(userAgent)
  val brand = device.getCapability("brand_name")
  val model = device.getCapability("model_name")
  (brand, model)
}

def lookupModelPartitions(wurflXmlPath: String)(userAgentIterator: 
Iterator[String]) = {
  val wurfl = new GeneralWURFLEngine(wurflXmlPath)
  wurfl.setEngineTarget(EngineTarget.accuracy)
  userAgentIterator.map(lookupModel(wurfl))
}

// the following will work
val wurflEngine = new 
GeneralWURFLEngine("/root/wurfl-1.6.1.0-release/wurfl.zip")
val userAgentSample = // my local dataset
val modelSample = userAgentSample.map(lookupModel(wurflEngine))

// the following will also work
val userAgentRdd = // my spark dataset
val modelRdd = 
userAgentRdd.mapPartitions(lookupModelPartitions("/root/wurfl-1.6.1.0-release/wurfl.zip”))
modelRdd.take(10)

// but the following will not work
modelRdd.count

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 
491, 10.128.224.227): net.sourceforge.wurfl.core.exc.WURFLRuntimeException: 
WURFL unexpected exception
        at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:286)
        at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.getDeviceForRequest(GeneralWURFLEngine.java:425)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.lookupModel(<console>:23)
        at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(<console>:27)
        at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(<console>:27)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1628)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
        at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLResourceException: 
WURFL unexpected exception
        at 
net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:350)
        at 
net.sourceforge.wurfl.core.resource.XMLResource.getData(XMLResource.java:154)
        at 
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:118)
        at 
net.sourceforge.wurfl.core.resource.DefaultWURFLModel.<init>(DefaultWURFLModel.java:110)
        at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.init(GeneralWURFLEngine.java:304)
        at 
net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:283)
        ... 16 more
Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLParsingException: The 
devices with id generic define more is_wireless_device
        at 
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startCapabilityElement(XMLResource.java:680)
        at 
net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startElement(XMLResource.java:534)
        at 
com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.startElement(AbstractSAXParser.java:509)
        at 
com.sun.org.apache.xerces.internal.parsers.AbstractXMLDocumentParser.emptyElement(AbstractXMLDocumentParser.java:182)
        at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanStartElement(XMLDocumentFragmentScannerImpl.java:1343)
        at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786)
        at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
        at 
com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:510)
        at 
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:848)
        at 
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:777)
        at 
com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
        at 
com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.parse(AbstractSAXParser.java:1213)
        at 
com.sun.org.apache.xerces.internal.jaxp.SAXParserImpl$JAXPSAXParser.parse(SAXParserImpl.java:648)
        at 
com.sun.org.apache.xerces.internal.jaxp.SAXParserImpl.parse(SAXParserImpl.java:332)
        at javax.xml.parsers.SAXParser.parse(SAXParser.java:195)
        at 
net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:348)
        ... 21 more

Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Please let me know if you have experience.

Thanks,

Zhongxiao

Reply via email to