Re: Multiple Kafka topics processing in Spark 2.2
Hi,Alonso. Thanks! I've read about this but did not quite understand it. To pick out the topic name of a kafka message seems a simple task but the example code looks so complicated with redundent info. Why do we need offsetRanges here and do we have a easy way to achieve this? Cheers, Dan 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman : > Hi, reading the official doc > <http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html>, > i think you can do it this way: > > import org.apache.spark.streaming.kafka._ > >val directKafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > > ssc, kafkaParams, topicsSet) > > > // Hold a reference to the current offset ranges, so it can be used > downstream > var offsetRanges = Array.empty[OffsetRange] > > directKafkaStream.transform { rdd => >offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >rdd > }.map { >... > }.foreachRDD { rdd => >for (o <- offsetRanges) { > println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}") >} > > } > > > 2017-09-06 14:38 GMT+02:00 Dan Dong : > >> Hi, All, >> I have one issue here about how to process multiple Kafka topics in a >> Spark 2.* program. My question is: How to get the topic name from a message >> received from Kafka? E.g: >> >> .. >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder]( >> ssc, kafkaParams, topicsSet) >> >> // Get the lines, split them into words, count the words and print >> val lines = messages.map(_._2) >> val words = lines.flatMap(_.split(" ")) >> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) >> wordCounts.print() >> .. >> >> Kafka send the messages in multiple topics through console producer for >> example. But when Spark receive the message, how it will know which topic >> is this piece of message coming from? Thanks a lot for any of your helps! >> >> Cheers, >> Dan >> > > > > -- > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >
Multiple Kafka topics processing in Spark 2.2
Hi, All, I have one issue here about how to process multiple Kafka topics in a Spark 2.* program. My question is: How to get the topic name from a message received from Kafka? E.g: .. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() .. Kafka send the messages in multiple topics through console producer for example. But when Spark receive the message, how it will know which topic is this piece of message coming from? Thanks a lot for any of your helps! Cheers, Dan
Could not access Spark webUI on OpenStack VMs
Hi, all, I'm having problem to access the web UI of my Spark cluster. The cluster is composed of a few virtual machines running on a OpenStack platform. The VMs are launched from CentOS7.0 server image available from official site. The Spark itself runs well and master and worker process are all up and running, and run SparkPi example also get expected result. So, the question is, how to debug such a problem? Should it be a native problem of the CentOS image as it is not a desktop version, so some graphic packages might be missing in the VM? Or it is a iptables settings problem comes from OpenStack, as Openstack configures complex network inside it and it might block certain communication? Does anybody find similar problems? Any hints will be appreciated. Thanks! Cheers, Dong
Re: java.lang.NoSuchMethodError for "list.toMap".
Hi, Akhil, Yes, in the build.sbt I wrongly set it to the installed scala version of 2.11.6 on the cluster, fixed now. Thanks! Cheers, Dan 2015-07-27 2:29 GMT-05:00 Akhil Das : > Whats in your build.sbt? You could be messing with the scala version it > seems. > > Thanks > Best Regards > > On Fri, Jul 24, 2015 at 2:15 AM, Dan Dong wrote: > >> Hi, >> When I ran with spark-submit the following simple Spark program of: >> import org.apache.spark.SparkContext._ >> import org.apache.spark.SparkConf >> import org.apache.spark.rdd.RDD >> import org.apache.spark.SparkContext >> import org.apache.spark._ >> import SparkContext._ >> >> object TEST2{ >> def main(args:Array[String]) >> { >> val conf = new SparkConf().setAppName("TEST") >> val sc=new SparkContext(conf) >> >> val list=List(("aa",1),("bb",2),("cc",3)) >> val maps=list.toMap >> } >> >> } >> >> I got java.lang.NoSuchMethodError for the line of "val maps=list.toMap". >> But in a spark-shell or simply scala, it has no problem: >> >> scala> val list=List(("aa",1),("bb",2),("cc",3)) >> list: List[(String, Int)] = List((aa,1), (bb,2), (cc,3)) >> >> scala> val maps=list.toMap >> maps: scala.collection.immutable.Map[String,Int] = Map(aa -> 1, bb -> 2, >> cc -> 3) >> >> So to use "toMap" method, what am I missing in spark-submit? I use "sbt >> package" to compile the program and without problem. Thanks! >> >> Cheers, >> Dan >> >> >
java.lang.NoSuchMethodError for "list.toMap".
Hi, When I ran with spark-submit the following simple Spark program of: import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark._ import SparkContext._ object TEST2{ def main(args:Array[String]) { val conf = new SparkConf().setAppName("TEST") val sc=new SparkContext(conf) val list=List(("aa",1),("bb",2),("cc",3)) val maps=list.toMap } } I got java.lang.NoSuchMethodError for the line of "val maps=list.toMap". But in a spark-shell or simply scala, it has no problem: scala> val list=List(("aa",1),("bb",2),("cc",3)) list: List[(String, Int)] = List((aa,1), (bb,2), (cc,3)) scala> val maps=list.toMap maps: scala.collection.immutable.Map[String,Int] = Map(aa -> 1, bb -> 2, cc -> 3) So to use "toMap" method, what am I missing in spark-submit? I use "sbt package" to compile the program and without problem. Thanks! Cheers, Dan
Re: spark-submit and spark-shell behaviors mismatch.
The problem should be "toMap", as I tested that "val maps2=maps.collect" runs ok. When I run spark-shell, I run with "--master mesos://cluster-1:5050" parameter which is the same with "spark-submit". Confused here. 2015-07-22 20:01 GMT-05:00 Yana Kadiyska : > Is it complaining about "collect" or "toMap"? In either case this error is > indicative of an old version usually -- any chance you have an old > installation of Spark somehow? Or scala? You can try running spark-submit > with --verbose. Also, when you say it runs with spark-shell do you run > spark shell in local mode or with --master? I'd try with --master master you use for spark-submit> > > Also, if you're using standalone mode I believe the worker log contains > the launch command for the executor -- you probably want to examine that > classpath carefully > > On Wed, Jul 22, 2015 at 5:25 PM, Dan Dong wrote: > >> Hi, >> >> I have a simple test spark program as below, the strange thing is that >> it runs well under a spark-shell, but will get a runtime error of >> >> java.lang.NoSuchMethodError: >> >> in spark-submit, which indicate the line of: >> >> val maps2=maps.collect.toMap >> >> has problem. But why the compilation has no problem and it works well >> under spark-shell(==>maps2: scala.collection.immutable.Map[Int,String] = >> Map(269953 -> once, 97 -> a, 451002 -> upon, 117481 -> was, 226916 -> >> there, 414413 -> time, 146327 -> king) )? Thanks! >> >> import org.apache.spark.SparkContext._ >> import org.apache.spark.SparkConf >> import org.apache.spark.mllib.feature.HashingTF >> import org.apache.spark.mllib.linalg.Vector >> import org.apache.spark.rdd.RDD >> import org.apache.spark.SparkContext >> import org.apache.spark._ >> import SparkContext._ >> >> >> val docs=sc.parallelize(Array(Array("once" ,"upon", "a", "time"), >> Array("there", "was", "a", "king"))) >> >> val hashingTF = new HashingTF() >> >> val maps=docs.flatMap{term=>term.map(ele=>(hashingTF.indexOf(ele),ele))} >> >> val maps2=maps.collect.toMap >> >> >> Cheers, >> >> Dan >> >> >
spark-submit and spark-shell behaviors mismatch.
Hi, I have a simple test spark program as below, the strange thing is that it runs well under a spark-shell, but will get a runtime error of java.lang.NoSuchMethodError: in spark-submit, which indicate the line of: val maps2=maps.collect.toMap has problem. But why the compilation has no problem and it works well under spark-shell(==>maps2: scala.collection.immutable.Map[Int,String] = Map(269953 -> once, 97 -> a, 451002 -> upon, 117481 -> was, 226916 -> there, 414413 -> time, 146327 -> king) )? Thanks! import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark._ import SparkContext._ val docs=sc.parallelize(Array(Array("once" ,"upon", "a", "time"), Array("there", "was", "a", "king"))) val hashingTF = new HashingTF() val maps=docs.flatMap{term=>term.map(ele=>(hashingTF.indexOf(ele),ele))} val maps2=maps.collect.toMap Cheers, Dan
Re: How to share a Map among RDDS?
Thanks Andrew, exactly. 2015-07-22 14:26 GMT-05:00 Andrew Or : > Hi Dan, > > `map2` is a broadcast variable, not your map. To access the map on the > executors you need to do `map2.value(a)`. > > -Andrew > > 2015-07-22 12:20 GMT-07:00 Dan Dong : > >> Hi, Andrew, >> If I broadcast the Map: >> val map2=sc.broadcast(map1) >> >> I will get compilation error: >> org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]] >> does not take parameters >> [error] val matchs= Vecs.map(term=>term.map{case (a,b)=>(map2(a),b)}) >> >> Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks! >> >> Cheers, >> Dan >> >> >> >> 2015-07-22 2:20 GMT-05:00 Andrew Or : >> >>> Hi Dan, >>> >>> If the map is small enough, you can just broadcast it, can't you? It >>> doesn't have to be an RDD. Here's an example of broadcasting an array and >>> using it on the executors: >>> https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala >>> . >>> >>> -Andrew >>> >>> 2015-07-21 19:56 GMT-07:00 ayan guha : >>> >>>> Either you have to do rdd.collect and then broadcast or you can do a >>>> join >>>> On 22 Jul 2015 07:54, "Dan Dong" wrote: >>>> >>>>> Hi, All, >>>>> >>>>> >>>>> I am trying to access a Map from RDDs that are on different compute >>>>> nodes, but without success. The Map is like: >>>>> >>>>> val map1 = Map("aa"->1,"bb"->2,"cc"->3,...) >>>>> >>>>> All RDDs will have to check against it to see if the key is in the Map >>>>> or not, so seems I have to make the Map itself global, the problem is that >>>>> if the Map is stored as RDDs and spread across the different nodes, each >>>>> node will only see a piece of the Map and the info will not be complete to >>>>> check against the Map( an then replace the key with the corresponding >>>>> value) E,g: >>>>> >>>>> val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)}) >>>>> >>>>> But if the Map is not an RDD, how to share it like sc.broadcast(map1) >>>>> >>>>> Any idea about this? Thanks! >>>>> >>>>> >>>>> Cheers, >>>>> Dan >>>>> >>>>> >>> >> >
Re: How to share a Map among RDDS?
Hi, Andrew, If I broadcast the Map: val map2=sc.broadcast(map1) I will get compilation error: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]] does not take parameters [error] val matchs= Vecs.map(term=>term.map{case (a,b)=>(map2(a),b)}) Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks! Cheers, Dan 2015-07-22 2:20 GMT-05:00 Andrew Or : > Hi Dan, > > If the map is small enough, you can just broadcast it, can't you? It > doesn't have to be an RDD. Here's an example of broadcasting an array and > using it on the executors: > https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala > . > > -Andrew > > 2015-07-21 19:56 GMT-07:00 ayan guha : > >> Either you have to do rdd.collect and then broadcast or you can do a join >> On 22 Jul 2015 07:54, "Dan Dong" wrote: >> >>> Hi, All, >>> >>> >>> I am trying to access a Map from RDDs that are on different compute >>> nodes, but without success. The Map is like: >>> >>> val map1 = Map("aa"->1,"bb"->2,"cc"->3,...) >>> >>> All RDDs will have to check against it to see if the key is in the Map >>> or not, so seems I have to make the Map itself global, the problem is that >>> if the Map is stored as RDDs and spread across the different nodes, each >>> node will only see a piece of the Map and the info will not be complete to >>> check against the Map( an then replace the key with the corresponding >>> value) E,g: >>> >>> val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)}) >>> >>> But if the Map is not an RDD, how to share it like sc.broadcast(map1) >>> >>> Any idea about this? Thanks! >>> >>> >>> Cheers, >>> Dan >>> >>> >
How to share a Map among RDDS?
Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map("aa"->1,"bb"->2,"cc"->3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=>term.map{case (a,b)=>(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Re: To access elements of a org.apache.spark.mllib.linalg.Vector
Yes, it works! Thanks a lot Burak! Cheers, Dan 2015-07-14 14:34 GMT-05:00 Burak Yavuz : > Hi Dan, > > You could zip the indices with the values if you like. > > ``` > val sVec = sparseVector(1).asInstanceOf[ > org.apache.spark.mllib.linalg.SparseVector] > val map = sVec.indices.zip(sVec.values).toMap > ``` > > Best, > Burak > > On Tue, Jul 14, 2015 at 12:23 PM, Dan Dong wrote: > >> Hi, >> I'm wondering how to access elements of a linalg.Vector, e.g: >> sparseVector: Seq[org.apache.spark.mllib.linalg.Vector] = >> List((3,[1,2],[1.0,2.0]), (3,[0,1,2],[3.0,4.0,5.0])) >> >> >> scala> sparseVector(1) >> res16: org.apache.spark.mllib.linalg.Vector = (3,[0,1,2],[3.0,4.0,5.0]) >> >> How to get the indices(0,1,2) and values(3.0,4.0,5.0) of e.g: >> (3,[0,1,2],[3.0,4.0,5.0]) ? >> It will be useful to map them into index->value pairs like: >> 0->3.0 >> 1->4.0 >> 2->5.0 >> >> I could not find such info from: >> >> https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.linalg.Vector >> >> Thanks. >> >> Cheers, >> Dan >> >> >
To access elements of a org.apache.spark.mllib.linalg.Vector
Hi, I'm wondering how to access elements of a linalg.Vector, e.g: sparseVector: Seq[org.apache.spark.mllib.linalg.Vector] = List((3,[1,2],[1.0,2.0]), (3,[0,1,2],[3.0,4.0,5.0])) scala> sparseVector(1) res16: org.apache.spark.mllib.linalg.Vector = (3,[0,1,2],[3.0,4.0,5.0]) How to get the indices(0,1,2) and values(3.0,4.0,5.0) of e.g: (3,[0,1,2],[3.0,4.0,5.0]) ? It will be useful to map them into index->value pairs like: 0->3.0 1->4.0 2->5.0 I could not find such info from: https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.mllib.linalg.Vector Thanks. Cheers, Dan
How to specify PATHS for user defined functions.
Hi, All, I have a function and want to access it in my spark programs, but I got the: "Exception in thread "main" java.lang.NoSuchMethodError" in spark-submit. I put the function under: ./src/main/scala/com/aaa/MYFUNC/MYFUNC.scala: package com.aaa.MYFUNC object MYFUNC{ def FUNC1(input: List[String]) = { .. } } and in my Spark program I import it like: import com.aaa.MYFUNC._ ... val aaa=List("import", "org", "apache", "spark", "SparkContext") val res=MYFUNC.FUNC1(aaa) ... But after I "sbt package" and set the CLASSPATH and spark-submit the program I got the above error. It's strange that I can import this package and run the function of "val res=MYFUNC.FUNC1(aaa)" under a spark-shell successfully. What's the possible problems? Thanks! Cheers, Dan
question about the TFIDF.
Hi, All, When I try to follow the document about tfidf from: http://spark.apache.org/docs/latest/mllib-feature-extraction.html val conf = new SparkConf().setAppName("TFIDF") val sc=new SparkContext(conf) val documents=sc.textFile("hdfs://cluster-test-1:9000/user/ubuntu/textExample.txt").map(_.split(" ").toSeq) val hashingTF = new HashingTF() val tf= hashingTF.transform(documents) tf.cache() val idf = new IDF().fit(tf) val tfidf = idf.transform(tf) val rdd=tfidf.map { vec => vec} rdd.saveAsTextFile("/user/ubuntu/aaa") I got the following 3 lines output which corresponding to my 3 lines input file( each line can be viewed as a separate document): (1048576,[3211,72752,119839,413342,504006,714241],[1.3862943611198906,0.6931471805599453,0.0,0.6931471805599453,0.6931471805599453,0.6931471805599453]) (1048576,[53232,96852,109270,119839],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.0]) (1048576,[3139,5740,119839,502586,503762],[0.6931471805599453,0.6931471805599453,0.0,0.6931471805599453,0.6931471805599453]) But how to interpret this? How to match words to the tfidf values? E.g: word1->1.3862943611198906 word2->0.6931471805599453 .. In general, how should people interpret/analyze "tfidf" from the following? Thanks! val tfidf = idf.transform(tf) Cheers, Dan
Re: multiple programs compilation by sbt.
HI, Ted, I will have a look at it , thanks a lot. Cheers, Dan 2015年4月29日 下午5:00于 "Ted Yu" 写道: > Have you looked at > http://www.scala-sbt.org/0.13/tutorial/Multi-Project.html ? > > Cheers > > On Wed, Apr 29, 2015 at 2:45 PM, Dan Dong wrote: > >> Hi, >> Following the Quick Start guide: >> https://spark.apache.org/docs/latest/quick-start.html >> >> I could compile and run a Spark program successfully, now my question is >> how to >> compile multiple programs with sbt in a bunch. E.g, two programs as: >> >> >> ./src >> ./src/main >> ./src/main/scala >> ./src/main/scala/SimpleApp_A.scala >> ./src/main/scala/SimpleApp_B.scala >> >> Hopefully with "sbt package", I will get two .jar files for each of the >> source program, then I can run them separately in Spark. I tried to create >> two .sbt files for each program, but found only one .jar file is created. >> >> ./simpleA.sbt >> name := "Simple Project A" >> version := "1.0" >> scalaVersion := "2.10.4" >> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" >> >> ./simpleB.sbt >> name := "Simple Project B" >> version := "1.0" >> scalaVersion := "2.10.4" >> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" >> >> Does anybody know how to do it? >> >> Cheers, >> Dan >> >> >
multiple programs compilation by sbt.
Hi, Following the Quick Start guide: https://spark.apache.org/docs/latest/quick-start.html I could compile and run a Spark program successfully, now my question is how to compile multiple programs with sbt in a bunch. E.g, two programs as: ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp_A.scala ./src/main/scala/SimpleApp_B.scala Hopefully with "sbt package", I will get two .jar files for each of the source program, then I can run them separately in Spark. I tried to create two .sbt files for each program, but found only one .jar file is created. ./simpleA.sbt name := "Simple Project A" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" ./simpleB.sbt name := "Simple Project B" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.1" Does anybody know how to do it? Cheers, Dan
Error: no snappyjava in java.library.path
Hi, All, When I run a small program in spark-shell, I got the following error: ... Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1886) at java.lang.Runtime.loadLibrary0(Runtime.java:849) at java.lang.System.loadLibrary(System.java:1088) at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52) ... 29 more ... I see the file is actually there under my hadoop installation dir, e.g: ./hadoop-2.5.0-cdh5.2.0/share/hadoop/mapreduce2/lib/snappy-java-1.0.4.1.jar ./hadoop-2.5.0-cdh5.2.0/share/hadoop/mapreduce1/lib/snappy-java-1.0.4.1.jar ./hadoop-2.5.0-cdh5.2.0/share/hadoop/kms/tomcat/webapps/kms/WEB-INF/lib/snappy-java-1.0.4.1.jar ./hadoop-2.5.0-cdh5.2.0/share/hadoop/common/lib/snappy-java-1.0.4.1.jar ./hadoop-2.5.0-cdh5.2.0/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar ./hadoop-2.5.0-cdh5.2.0/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar But although after I included one of the above path in $CLASSPATH, the error is still there. So how to set the *PATH*s to resolve it? Thanks! $ echo $CLASSPATH /home/ubuntu/hadoop-2.5.0-cdh5.2.0/share/hadoop/mapreduce/lib Cheers, Dan
no snappyjava in java.library.path
Hi, My Spark job failed with "no snappyjava in java.library.path" as: Caused by: java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1857) at java.lang.Runtime.loadLibrary0(Runtime.java:870) at java.lang.System.loadLibrary(System.java:1119) at org.xerial.snappy.SnappyNativeLoader.loadLibrary(SnappyNativeLoader.java:52) I'm running spark-1.1.1 on hadoop2.4. I found that the file is there and I have included it in the CLASSPATH already. ../hadoop/share/hadoop/tools/lib/snappy-java-1.0.4.1.jar ../hadoop/share/hadoop/common/lib/snappy-java-1.0.4.1.jar ../hadoop/share/hadoop/httpfs/tomcat/webapps/webhdfs/WEB-INF/lib/snappy-java-1.0.4.1.jar Did I miss anything or I should set it in other way? Cheers, Dan