Hi there,
Spark version: 1.2 /home/hadoop/spark/bin/spark-submit --class com.litb.bi.CSLog2ES --master yarn --executor-memory 1G --jars /mnt/external/kafka/target/spark-streaming-kafka_2.10-1.2.0.jar,/mnt/external/kafka/target/zkclient-0.3.jar,/mnt/external/kafka/target/metrics-core-2.2.0.jar,/mnt/external/kafka/target/kafka_2.10-0.8.0.jar,elasticsearch-hadoop-2.1.0.Beta3.jar,geoip-api-1.2.13.jar --files /mnt/GeoIP.dat BILog-1.1-SNAPSHOT.jar 54.175.174.144 test test_ctrlitb 2 In my code, I want to use the GeoIP.dat to parse the IP of clickstream log. val Array(zkQuorum, group, topics, numThreads) = args val conf = new SparkConf().setAppName("Kafka CTRLog to ES") conf.set("spark.streaming.receiver.writeAheadLogs.enable", "true") conf.set("es.index.auto.create", "true") conf.set("es.nodes", "10.5.2.250") // conf.set("spark.serializer", classOf[KryoSerializer].getName) val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2) // geoip file on executor val geofile_path = SparkFiles.get("GeoIP.dat") val cl = new LookupService(geofile_path, LookupService.GEOIP_MEMORY_CACHE | LookupService.GEOIP_CHECK_CACHE) I got the the following execption: 2015-02-08 06:51:17,064 INFO [main] handler.ContextHandler (ContextHandler.java:startContext(737)) - started o.e.j.s.ServletContextHandler{/streaming,null} 2015-02-08 06:51:17,065 INFO [main] handler.ContextHandler (ContextHandler.java:startContext(737)) - started o.e.j.s.ServletContextHandler{/streaming/json,null} Exception in thread "main" java.io.FileNotFoundException: /tmp/spark-d85f0f21-2e66-4ed7-ae31-47564c8dfefd/GeoIP.dat (No such file or directory) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:241) at com.maxmind.geoip.LookupService.init(LookupService.java:282) at com.maxmind.geoip.LookupService.init(LookupService.java:264) at com.litb.bi.CSLog2ES$.main(CSLog2ES.scala:51) at com.litb.bi.CSLog2ES.main(CSLog2ES.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ---------- Shen Zhun Data Mining at LightnInTheBox.com Email:shenzhunal...@gmail.com