Hi there,

Spark version: 1.2


/home/hadoop/spark/bin/spark-submit
--class com.litb.bi.CSLog2ES
--master yarn
--executor-memory 1G
--jars 
/mnt/external/kafka/target/spark-streaming-kafka_2.10-1.2.0.jar,/mnt/external/kafka/target/zkclient-0.3.jar,/mnt/external/kafka/target/metrics-core-2.2.0.jar,/mnt/external/kafka/target/kafka_2.10-0.8.0.jar,elasticsearch-hadoop-2.1.0.Beta3.jar,geoip-api-1.2.13.jar
--files /mnt/GeoIP.dat
BILog-1.1-SNAPSHOT.jar 54.175.174.144 test test_ctrlitb 2


In my code, I want to use the GeoIP.dat to parse the IP of clickstream log.


val Array(zkQuorum, group, topics, numThreads) = args
  val conf = new SparkConf().setAppName("Kafka CTRLog to ES")
  conf.set("spark.streaming.receiver.writeAheadLogs.enable", "true")
  conf.set("es.index.auto.create", "true")
  conf.set("es.nodes", "10.5.2.250")
  //  conf.set("spark.serializer", classOf[KryoSerializer].getName)
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(1))
  val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
  val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)


  // geoip file on executor
  val geofile_path = SparkFiles.get("GeoIP.dat")
  val cl = new LookupService(geofile_path, LookupService.GEOIP_MEMORY_CACHE | 
LookupService.GEOIP_CHECK_CACHE)


I got the the following execption:


2015-02-08 06:51:17,064 INFO [main] handler.ContextHandler 
(ContextHandler.java:startContext(737)) - started 
o.e.j.s.ServletContextHandler{/streaming,null}
2015-02-08 06:51:17,065 INFO [main] handler.ContextHandler 
(ContextHandler.java:startContext(737)) - started 
o.e.j.s.ServletContextHandler{/streaming/json,null}
Exception in thread "main" java.io.FileNotFoundException: 
/tmp/spark-d85f0f21-2e66-4ed7-ae31-47564c8dfefd/GeoIP.dat (No such file or 
directory)
at java.io.RandomAccessFile.open(Native Method)
at java.io.RandomAccessFile.init(RandomAccessFile.java:241)
at com.maxmind.geoip.LookupService.init(LookupService.java:282)
at com.maxmind.geoip.LookupService.init(LookupService.java:264)
at com.litb.bi.CSLog2ES$.main(CSLog2ES.scala:51)
at com.litb.bi.CSLog2ES.main(CSLog2ES.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


----------
Shen Zhun
Data Mining at LightnInTheBox.com
Email:shenzhunal...@gmail.com

Reply via email to