Streaming problems running 24x7
Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = . var zkQuorum = var group = var topics = var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( unix_secs - 0 ) hm += ( unix_nsecs - 1 ) hm += ( sysuptime - 2 ) hm += ( exaddr - 3 ) hm += ( dpkts - 4 ) hm += ( doctets -5 ) hm += ( first - 6 ) hm += ( last - 7 ) hm += ( engine_type - 8 ) hm += ( engine_id - 9 ) hm += ( srcaddr -10 ) hm += ( dstaddr -11 ) hm += ( nexthop -12 ) hm += ( input - 13 ) hm += ( output - 14 ) hm += ( srcport -15 ) hm += ( dstport -16 ) hm += ( prot - 17 ) hm += ( tos -18 ) hm += ( tcp_flags - 19 ) hm += ( src_mask - 20 ) hm += ( dst_mask - 21 ) hm += ( src_as - 22 ) hm += ( dst_as - 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == total) return total else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == flows) return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(,) (keycamps.map(getKey(arr, _)).mkString(,) , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ + valcamp + .csv val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t = pw.println (t._2 + , + t._1) } pw.close ftmpf.renameTo(f1f); } def main(args: Array[String]) { if (args.length 1) { System.err.println(Usage: NetflowTopn apppath) System.exit(1) } appPath = args(0) try { val prop = new Properties() prop.load(new FileInputStream(appPath + /conf/app.properties)) zkQuorum =prop.getProperty(KAFKA_HOST) group = prop.getProperty(KAFKA_GROUP) topics = prop.getProperty(KAFKA_TOPIC) numThreads = prop.getProperty(THREADS).toInt } catch { case e: Exception = e.printStackTrace() sys.exit(1) } val sparkConf = new SparkConf().setAppName(netflow-topn) .set(spark.default.parallelism, 2) .set(spark.rdd.compress, true) .set(spark.streaming.unpersist, true) .set(spark.cleaner.ttl, 300) val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) val topicMap = topics.split(,).map((_,numThreads.toInt)).toMap val kafpar = Map[String, String]( zookeeper.connect - zkQuorum, group.id - group, zookeeper.connection.timeout.ms - 5000, auto.commit.interval.ms - 6, auto.offset.reset - largest ) val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder] (ssc, kafpar, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2).cache() val ll_keycamps = List ( List(srcaddr, dstaddr) ,List(dstaddr) ,List(srcaddr) ,List(srcport)
Re: Microsoft SQL jdbc support from spark sql
I was running the spark shell and sql with --jars option containing the paths when I got my error. What is the correct way to add jars I am not sure. I tried placing the jar inside the directory you said but still get the error. I will give the code you posted a try. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22514.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to do dispatching in Streaming?
And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: How to do dispatching in Streaming?
From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, April 16, 2015 12:52 AM *To:* Jianshi Huang *Cc:* user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Microsoft SQL jdbc support from spark sql
I am running the queries from spark-sql. I don't think it can communicate with thrift server. Can you tell how I should run the quries to make it work. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22516.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Microsoft SQL jdbc support from spark sql
Looks a good option. BTW v3.0 is round the corner. http://slick.typesafe.com/news/2015/04/02/slick-3.0.0-RC3-released.html Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22521.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL query key/value in Map
Hi, I'm new with both Cassandra and Spark and am experimenting with what Spark SQL can do as it will affect my Cassandra data model. What I need is a model that can accept arbitrary fields, similar to Postgres's Hstore. Right now, I'm trying out the map type in Cassandra but I'm getting the exception below when running my Spark SQL: java.lang.RuntimeException: Can't access nested field in type MapType(StringType,StringType,true) The schema I have now is: root |-- device_id: integer (nullable = true) |-- event_date: string (nullable = true) |-- fields: map (nullable = true) ||-- key: string ||-- value: string (valueContainsNull = true) And my Spark SQL is: SELECT fields from raw_device_data where fields.driver = 'driver1' From what I gather, this should work for a JSON based RDD (https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html). Is this not supported for a Cassandra map type? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-query-key-value-in-Map-tp22517.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to do dispatching in Streaming?
Ooops – what does “more work” mean in a Parallel Programming paradigm and does it always translate in “inefficiency” Here are a few laws of physics in this space: 1. More Work if done AT THE SAME time AND fully utilizes the cluster resources is a GOOD thing 2. More Work which can not be done at the same time and has to be processed sequentially is a BAD thing So the key is whether it is about 1 or 2 and if it is about 1, whether it leads to e.g. Higher Throughput and Lower Latency or not Regards, Evo Eftimov From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, April 16, 2015 10:41 AM To: Evo Eftimov Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Passing Elastic Search Mappings in Spark Conf
Thanks Nick. I understand that we can configure the index by creating the index with the mapping first. I thought it will be a good feature to be added in the es-hadoop /es-spark as we can have the full mapping and code in a single space especially for simple mappings on a particular field. It make sense to create the mapping first for ETL jobs . But for Data Scientists it might be useful to have all in a single space when we have to recreate the indexes multiple times on changing logic. On Thu, Apr 16, 2015 at 6:52 AM, Nick Pentreath nick.pentre...@gmail.com wrote: If you want to specify mapping you must first create the mappings for your index types before indexing. As far as I know there is no way to specify this via ES-hadoop. But it's best practice to explicitly create mappings prior to indexing, or to use index templates when dynamically creating indexes. — Sent from Mailbox On Thu, Apr 16, 2015 at 1:14 AM, Deepak Subhramanian deepak.subhraman...@gmail.com wrote: Hi, Is there a way to pass the mapping to define a field as not analyzed with es-spark settings. I am just wondering if I can set the mapping type for a field as not analyzed using the set function in spark conf as similar to the other es settings. val sconf = new SparkConf() .setMaster(local[1]) .setAppName(Load Data To ES) .set(spark.ui.port, 4141) .set(es.index.auto.create, true) .set(es.net.http.auth.user, es_admin) .set(es.index.auto.create, true) .set(es.mapping.names, CREATED_DATE:@timestamp) Thanks, Deepak Subhramanian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Deepak Subhramanian - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming problems running 24x7
I used to hit this issue when my processing time exceeds the batch duration. Here's a few workarounds: - Use storage level MEMORY_AND_DISK - Enable WAL and check pointing Above two will slow down things a little bit. If you want low latency, what you can try is: - Use storage level as MEMORY_ONLY_2 ( Atleast replicates it) - Tachyon based off heap for storage (havent tried this, but will let you know) And from spark 1.3.1 version, they have purged the old WAL and it has better performance. You could try that also. On 16 Apr 2015 14:10, Miquel miquel.gonza...@tecsidel.es wrote: Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = . var zkQuorum = var group = var topics = var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( unix_secs - 0 ) hm += ( unix_nsecs - 1 ) hm += ( sysuptime - 2 ) hm += ( exaddr - 3 ) hm += ( dpkts - 4 ) hm += ( doctets -5 ) hm += ( first - 6 ) hm += ( last - 7 ) hm += ( engine_type - 8 ) hm += ( engine_id - 9 ) hm += ( srcaddr -10 ) hm += ( dstaddr -11 ) hm += ( nexthop -12 ) hm += ( input - 13 ) hm += ( output - 14 ) hm += ( srcport -15 ) hm += ( dstport -16 ) hm += ( prot - 17 ) hm += ( tos -18 ) hm += ( tcp_flags - 19 ) hm += ( src_mask - 20 ) hm += ( dst_mask - 21 ) hm += ( src_as - 22 ) hm += ( dst_as - 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == total) return total else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == flows) return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(,) (keycamps.map(getKey(arr, _)).mkString(,) , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ + valcamp + .csv val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t = pw.println (t._2 + , + t._1) } pw.close ftmpf.renameTo(f1f); } def main(args: Array[String]) { if (args.length 1) { System.err.println(Usage: NetflowTopn apppath) System.exit(1) } appPath = args(0) try { val prop = new Properties() prop.load(new FileInputStream(appPath + /conf/app.properties)) zkQuorum =prop.getProperty(KAFKA_HOST) group = prop.getProperty(KAFKA_GROUP) topics = prop.getProperty(KAFKA_TOPIC) numThreads = prop.getProperty(THREADS).toInt } catch { case e: Exception = e.printStackTrace() sys.exit(1) } val sparkConf = new SparkConf().setAppName(netflow-topn) .set(spark.default.parallelism, 2) .set(spark.rdd.compress, true) .set(spark.streaming.unpersist, true) .set(spark.cleaner.ttl, 300) val ssc = new StreamingContext(sparkConf, Seconds(batch_interval)) val topicMap =
RE: How to do dispatching in Streaming?
Also you can have each message type in a different topic (needs to be arranged upstream from your Spark Streaming app ie in the publishing systems and the messaging brokers) and then for each topic you can have a dedicated instance of InputReceiverDStream which will be the start of a dedicated DAG pipeline instance for every message type. Moreover each such DAG pipeline instance will run in parallel with the others From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: executor failed, cannot find compute-classpath.sh
Hi, has this issue been resolved? I am currently running into similar problems. I am using spark-1.3.0-bin-hadoop2.4 on Windows and Ubuntu. I have setup all path on my Windows machine in an identical manner as on my Ubuntu server (using cygwin, so everything is somewhere under /usr/local/spark...). However, when I run the PI example then I get this when starting the command from cygwin: bin/spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master spark://myServer:7077 file:///home/spark/spark-examples-1.3.0-hadoop2.4.0.jar Driver successfully submitted as driver-20150416112240-0007 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150416112240-0007 is ERROR Exception from cluster was: org.apache.spark.SparkException: Process List(/usr/local/spark-1.3.0-bin-hadoop2.4/bin/compute-classpath.sh) exited with code 127 org.apache.spark.SparkException: Process List(/usr/local/spark-1.3.0-bin-hadoop2.4/bin/compute-classpath.sh) exited with code 127 When I use the exact same command on the server then the job runs just fine. I confirmed that /usr/local/spark-1.3.0-bin-hadoop2.4/bin/compute-classpath.sh runs on my local machine and the server. What is missing? Thanks, Tim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-failed-cannot-find-compute-classpath-sh-tp859p22520.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Save org.apache.spark.mllib.linalg.Matri to a file
Thank you very much for your suggestions, Ignacio! I have posted my solution here: http://stackoverflow.com/questions/29649904/save-spark-org-apache-spark-mllib-linalg-matrix-to-a-file/29671193#29671193 Best regards, Florin On Wed, Apr 15, 2015 at 5:28 PM, Ignacio Blasco elnopin...@gmail.com wrote: You can turn the Matrix to an Array with .toArray and then: 1- Write it using Scala/Java IO to the local disk of the driver 2- parallelize it and use .saveAsTextFile 2015-04-15 14:16 GMT+02:00 Spico Florin spicoflo...@gmail.com: Hello! The result of correlation in Spark MLLib is a of type org.apache.spark.mllib.linalg.Matrix. (see http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations) val data: RDD[Vector] = ... val correlMatrix: Matrix = Statistics.corr(data, pearson) I would like to save the result into a file. How can I do this? Thanks, Florin
Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0
Its JTDS 1.3.1; http://sourceforge.net/projects/jtds/files/jtds/1.3.1/ I put that jar in /tmp on the driver/machine I’m running spark shell from. Then I ran with ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client So I’m guessing that --jars doesn’t set the class path for the primordial class loader. And because its on the class path in ‘user land’ I’m guessing Thinking a work around would be to merge my spark assembly jar with the jtds driver… But it seems like a hack. The other thing I notice is there is --file which lets me pass around files with the YARN distribute, so Im thinking I can somehow use this if --jars doesn’t work. Really I need to understand how the spark class path is set when running on YARN. From: ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.commailto:deepuj...@gmail.com Date: Thursday, 16 April 2015 3:02 pm To: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Can you provide the JDBC connector jar version. Possibly the full JAR name and full command you ran Spark with ? On Wed, Apr 15, 2015 at 11:27 AM, Nathan McCarthy nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au wrote: Just an update, tried with the old JdbcRDD and that worked fine. From: Nathan nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au Date: Wednesday, 15 April 2015 1:57 pm To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL JDBC Datasources API when running on YARN - Spark 1.3.0 Hi guys, Trying to use a Spark SQL context’s .load(“jdbc, …) method to create a DF from a JDBC data source. All seems to work well locally (master = local[*]), however as soon as we try and run on YARN we have problems. We seem to be running into problems with the class path and loading up the JDBC driver. I’m using the jTDS 1.3.1 driver, net.sourceforge.jtds.jdbc.Driver. ./bin/spark-shell --jars /tmp/jtds-1.3.1.jar --master yarn-client When trying to run I get an exception; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.sql.SQLException: No suitable driver found for jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd Thinking maybe we need to force load the driver, if I supply “driver” - “net.sourceforge.jtds.jdbc.Driver” to .load we get; scala sqlContext.load(jdbc, Map(url - jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd, driver - net.sourceforge.jtds.jdbc.Driver, dbtable - CUBE.DIM_SUPER_STORE_TBL”)) java.lang.ClassNotFoundException: net.sourceforge.jtds.jdbc.Driver at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:191) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:97) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:290) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:679) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:21) Yet if I run a Class.forName() just from the shell; scala Class.forName(net.sourceforge.jtds.jdbc.Driver) res1: Class[_] = class net.sourceforge.jtds.jdbc.Driver No problem finding the JAR. I’ve tried in both the shell, and running with spark-submit (packing the driver in with my application as a fat JAR). Nothing seems to work. I can also get a connection in the driver/shell no problem; scala import java.sql.DriverManager import java.sql.DriverManager scala DriverManager.getConnection(jdbc:jtds:sqlserver://blah:1433/MyDB;user=usr;password=pwd) res3: java.sql.Connection = net.sourceforge.jtds.jdbc.JtdsConnection@2a67ecd0 I’m probably missing some class path setting here. In jdbc.DefaultSource.createRelation it looks like the call to Class.forName doesn’t specify a class loader so it just uses the default Java behaviour to reflectively get the class loader. It almost feels like its using a different class loader. I also tried seeing if the class path was there on all my executors by running; import scala.collection.JavaConverters._ sc.parallelize(Seq(1,2,3,4)).flatMap(_ = java.sql.DriverManager.getDrivers().asScala.map(d = s”$d |
[SQL] DROP TABLE should also uncache table
Hi As per JIRA this issue is resolved, but i am still facing this issue. SPARK-2734 - DROP TABLE should also uncache table -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
custom input format in spark
Hi How to specify custom input format in spark and control isSplitable in between file. Need to read a file from HDFS , file format is custom and requirement is file should not be split inbetween when a executor node gets that partition of input dir. Can anyone share a sample in java. Thanks Shushant
ClassCastException processing date fields using spark SQL since 1.3.0
Hello guys, after upgrading spark to 1.3.0 (and performing necessary code changes) an issue appeared making me unable to handle Date fields (java.sql.Date) with Spark SQL module. An exception appears in the console when I try to execute and SQL query on a DataFrame (see below). When I tried to examine the cause of the exception, I found out, that it happens when the framework tries to collect column statistics on DataFrames - in particular: method gatherStats in org.apache.spark.sql.columnar.DateColumnStats is inherited from IntColumnStats, handles thus the column value as Integer, which causes this kind of error. Now the question is - what is the right type for Date field in Spark SQL DataFrames? - according to documentation for org.apache.spark.sql.types.DateType, it represents java.sql.Date (which doesn't work as it worked fine before Spark 1.3.0). - JvmType in org.apache.spark.sql.types.DateType points to Int - according to implementation of JdbcRDD, it looks like they still use DateType for java.sql.Date fields, so it seems to me, that an attempt to read from JDBC table containig date fields using Spark SQL will most likely end up with an error as well So what is the type handled by org.apache.spark.sql.types.DateType? Is it Int or is it still java.sql.Date? If it is an Int - what is the exact meaning of the number and how to convert from/to Date (sql.Date, util.Date, JodaTime...)? Thank You for Your help. Best regards R.Krist Stack trace of an exception being reported since upgrade to 1.3.0: java.lang.ClassCastException: java.sql.Date cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:105) ~[scala-library-2.11.6.jar:na] at org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:83) ~[spark-catalyst_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.IntColumnStats.gatherStats(ColumnStats.scala:191) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:135) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:111) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.11-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_11] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_11] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-processing-date-fields-using-spark-SQL-since-1-3-0-tp22522.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
Re: ClassCastException processing date fields using spark SQL since 1.3.0
...one additional note: implementation of org.apache.spark.sql.columnar.IntColumnStats is IMHO wrong. Small hint - what will be the resulting upper and lower values for column containing no data (empty RDD or null values in Int column across the whole RDD)? Shouldn't they be null? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ClassCastException-processing-date-fields-using-spark-SQL-since-1-3-0-tp22522p22523.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLLib SVMWithSGD : java.lang.OutOfMemoryError: Java heap space
Hi, I'm trying to train an SVM on KDD2010 dataset (available from libsvm). But I'm getting java.lang.OutOfMemoryError: Java heap space error. The dataset is really sparse and have around 8 million data points and 20 million features. I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM). I have used both Spark's SVMWithSGD and Liblinear's Spark implementation and I'm getting java.lang.OutOfMemoryError: Java heap space error for both. I have used following settings: executor-memory - 60G num-executors - 64 And other default settings Also I tried increasing the number of partitions. And tried with reduced dataset of half million data points. But I'm still getting the same error. Here is the stack trace for Spark's SVMWithSGD: Exception in thread main java.lang.OutOfMemoryError: Java heap space at org.apache.spark.mllib.optimization.GradientDescent$.runMiniBatchSGD(GradientDescent.scala:182) at org.apache.spark.mllib.optimization.GradientDescent.optimize(GradientDescent.scala:107) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:263) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) at org.apache.spark.mllib.classification.SVMWithSGD$.train(SVM.scala:201) at org.apache.spark.mllib.classification.SVMWithSGD$.train(SVM.scala:235) at org.apache.spark.mllib.classification.SVMWithSGD.train(SVM.scala) at org.linearsvm.SVMClassifier.main(SVMClassifier.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) And the stack trace for Liblinear's Spark implementation : java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:329) at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at
Data partitioning and node tracking in Spark-GraphX
I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Data partitioning and node tracking in Spark-GraphX
How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem with Spark SQL UserDefinedType and sbt assembly
Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
Re: Data partitioning and node tracking in Spark-GraphX
Thanks a lot for the reply. Indeed it is useful but to be more precise i have 3D data and want to index it using octree. Thus i aim to build a two level indexing mechanism i.e. First at global level i want to partition and send the data to the nodes then at node level i again want to use octree to inded my data at local level. Could you please elaborate the solution in this context ? On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov evo.efti...@isecc.com wrote: Well you can use a [Key, Value] RDD and partition it based on hash function on the Key and even a specific number of partitions (and hence cluster nodes). This will a) index the data, b) divide it and send it to multiple nodes. Re your last requirement - in a cluster programming environment/framework your app code should not be bothered on which physical node exactly, a partition resides Regards Evo Eftimov *From:* MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] *Sent:* Thursday, April 16, 2015 4:20 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: Data partitioning and node tracking in Spark-GraphX I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.* -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.*
Re: How to join RDD keyValuePairs efficiently
This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark on Windows
We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun
RE: How to join RDD keyValuePairs efficiently
Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(/temp/allDocs.obj) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (/temp/allDocs.obj) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in /temp/allDocs.obj sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun
RE: How to join RDD keyValuePairs efficiently
Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Data partitioning and node tracking in Spark-GraphX
I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir *CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.*
RE: Data partitioning and node tracking in Spark-GraphX
Well you can use a [Key, Value] RDD and partition it based on hash function on the Key and even a specific number of partitions (and hence cluster nodes). This will a) index the data, b) divide it and send it to multiple nodes. Re your last requirement - in a cluster programming environment/framework your app code should not be bothered on which physical node exactly, a partition resides Regards Evo Eftimov From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:20 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Data partitioning and node tracking in Spark-GraphX I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.
RE: Data partitioning and node tracking in Spark-GraphX
Well you can have a two level index structure, still without any need for physical cluster node awareness Level 1 Index is the previously described partitioned [K,V] RDD – this gets you to the value (RDD element) you need on the respective cluster node Level 2 Index – it will be built and reside within the Value of each [K,V] RDD element – so after you retrieve the appropriate Element from the appropriate cluster node based on Level 1 Index, then you query the Value in the element based on Level 2 Index From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:32 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Data partitioning and node tracking in Spark-GraphX Thanks a lot for the reply. Indeed it is useful but to be more precise i have 3D data and want to index it using octree. Thus i aim to build a two level indexing mechanism i.e. First at global level i want to partition and send the data to the nodes then at node level i again want to use octree to inded my data at local level. Could you please elaborate the solution in this context ? On Thu, Apr 16, 2015 at 5:23 PM, Evo Eftimov evo.efti...@isecc.com wrote: Well you can use a [Key, Value] RDD and partition it based on hash function on the Key and even a specific number of partitions (and hence cluster nodes). This will a) index the data, b) divide it and send it to multiple nodes. Re your last requirement - in a cluster programming environment/framework your app code should not be bothered on which physical node exactly, a partition resides Regards Evo Eftimov From: MUHAMMAD AAMIR [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:20 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: Data partitioning and node tracking in Spark-GraphX I want to use Spark functions/APIs to do this task. My basic purpose is to index the data and divide and send it to multiple nodes. Then at the time of accessing i want to reach the right node and data partition. I don't have any clue how to do this. Thanks, On Thu, Apr 16, 2015 at 5:13 PM, Evo Eftimov evo.efti...@isecc.com wrote: How do you intend to fetch the required data - from within Spark or using an app / code / module outside Spark -Original Message- From: mas [mailto:mas.ha...@gmail.com] Sent: Thursday, April 16, 2015 4:08 PM To: user@spark.apache.org Subject: Data partitioning and node tracking in Spark-GraphX I have a big data file, i aim to create index on the data. I want to partition the data based on user defined function in Spark-GraphX (Scala). Further i want to keep track the node on which a particular data partition is send and being processed so i could fetch the required data by accessing the right node and data partition. How can i achieve this? Any help in this regard will be highly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-no http://apache-spark-user-list.1001560.n3.nabble.com/Data-partitioning-and-node-tracking-in-Spark-GraphX-tp22527.html de-tracking-in-Spark-GraphX-tp22527.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Muhammad Aamir CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content. -- Regards, Muhammad Aamir CONFIDENTIALITY:This email is intended solely for the person(s) named and may be confidential and/or privileged.If you are not the intended recipient,please delete it,notify me and do not copy,use,or disclose its content.
dataframe can not find fields after loading from hive
I have a data frame in which I load data from a hive table. And my issue is that the data frame is missing the columns that I need to query. For example: val newdataset = dataset.where(dataset(label) === 1) gives me an error like the following: ERROR yarn.ApplicationMaster: User class threw exception: resolved attributes label missing from label, user_id, ...(the rest of the fields of my table org.apache.spark.sql.AnalysisException: resolved attributes label missing from label, user_id, ... (the rest of the fields of my table) where we can see that the label field actually exist. I manage to solve this issue by updating my syntax to: val newdataset = dataset.where($label === 1) which works. However I can not make this trick in all my queries. For example, when I try to do a unionAll from two subsets of the same data frame the error I am getting is that all my fields are missing. Can someone tell me if I need to do some post processing after loading from hive in order to avoid this kind of errors? Thanks -- Cesar Flores
Re: MLLib SVMWithSGD : java.lang.OutOfMemoryError: Java heap space
Try increasing your driver memory. Thanks Best Regards On Thu, Apr 16, 2015 at 6:09 PM, sarath sarathkrishn...@gmail.com wrote: Hi, I'm trying to train an SVM on KDD2010 dataset (available from libsvm). But I'm getting java.lang.OutOfMemoryError: Java heap space error. The dataset is really sparse and have around 8 million data points and 20 million features. I'm using a cluster of 8 nodes (each with 8 cores and 64G RAM). I have used both Spark's SVMWithSGD and Liblinear's Spark implementation and I'm getting java.lang.OutOfMemoryError: Java heap space error for both. I have used following settings: executor-memory - 60G num-executors - 64 And other default settings Also I tried increasing the number of partitions. And tried with reduced dataset of half million data points. But I'm still getting the same error. Here is the stack trace for Spark's SVMWithSGD: Exception in thread main java.lang.OutOfMemoryError: Java heap space at org.apache.spark.mllib.optimization.GradientDescent$.runMiniBatchSGD(GradientDescent.scala:182) at org.apache.spark.mllib.optimization.GradientDescent.optimize(GradientDescent.scala:107) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:263) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) at org.apache.spark.mllib.classification.SVMWithSGD$.train(SVM.scala:201) at org.apache.spark.mllib.classification.SVMWithSGD$.train(SVM.scala:235) at org.apache.spark.mllib.classification.SVMWithSGD.train(SVM.scala) at org.linearsvm.SVMClassifier.main(SVMClassifier.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) And the stack trace for Liblinear's Spark implementation : java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:329) at org.apache.spark.network.BlockTransferService$$anon$1.onBlockFetchSuccess(BlockTransferService.scala:95) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:206) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:72) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:124) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:93) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at
Custom partioner
Hi All I have a RDD which has 1 million keys and each key is repeated from around 7000 values so total there will be around 1M*7K records in RDD. and each key is created from ZipWithIndex so key start from 0 to M-1 the problem with ZipWithIndex is it take long for key which is 8 bytes. can I reduce it to 4 bytes? Now how Can I make sure the record with same key will go the same node so that I can avoid shuffling. Also how default partition-er will work here. Regards jeetendra
Re: custom input format in spark
Is it for spark? On Thu, Apr 16, 2015 at 10:05 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can simply override the isSplitable method in your custom inputformat class and make it return false. Here's a sample code snippet: http://stackoverflow.com/questions/17875277/reading-file-as-single-record-in-hadoop#answers-header Thanks Best Regards On Thu, Apr 16, 2015 at 4:18 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to specify custom input format in spark and control isSplitable in between file. Need to read a file from HDFS , file format is custom and requirement is file should not be split inbetween when a executor node gets that partition of input dir. Can anyone share a sample in java. Thanks Shushant
Re: Distinct is very slow
Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
General configurations on CDH5 to achieve maximum Spark Performance
Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.***
Re: How to join RDD keyValuePairs efficiently
You could try repartitioning your RDD using a custom partitioner (HashPartitioner etc) and caching the dataset into memory to speedup the joins. Thanks Best Regards On Tue, Apr 14, 2015 at 8:10 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item *val *allDocs : RDD[Document] =sc.*objectFile*[Document]( (“/temp/allDocs.obj”) *val *idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun
Distinct is very slow
Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
Re: Distinct is very slow
Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
Re: Distinct is very slow
I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination of Long,String and it will have 400K records JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); I am running distinct on 800K records and its taking 2 hours on 16 cores and 20 GB RAM.
Re: Super slow caching in 1.3?
Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: General configurations on CDH5 to achieve maximum Spark Performance
Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine .. Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.***
Re: custom input format in spark
You can simply override the isSplitable method in your custom inputformat class and make it return false. Here's a sample code snippet: http://stackoverflow.com/questions/17875277/reading-file-as-single-record-in-hadoop#answers-header Thanks Best Regards On Thu, Apr 16, 2015 at 4:18 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to specify custom input format in spark and control isSplitable in between file. Need to read a file from HDFS , file format is custom and requirement is file should not be split inbetween when a executor node gets that partition of input dir. Can anyone share a sample in java. Thanks Shushant
Re: custom input format in spark
You can plug in the native hadoop input formats with Spark's sc.newApiHadoopFile etc which takes in the inputformat. Thanks Best Regards On Thu, Apr 16, 2015 at 10:15 PM, Shushant Arora shushantaror...@gmail.com wrote: Is it for spark? On Thu, Apr 16, 2015 at 10:05 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can simply override the isSplitable method in your custom inputformat class and make it return false. Here's a sample code snippet: http://stackoverflow.com/questions/17875277/reading-file-as-single-record-in-hadoop#answers-header Thanks Best Regards On Thu, Apr 16, 2015 at 4:18 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi How to specify custom input format in spark and control isSplitable in between file. Need to read a file from HDFS , file format is custom and requirement is file should not be split inbetween when a executor node gets that partition of input dir. Can anyone share a sample in java. Thanks Shushant
RE: Super slow caching in 1.3?
Michael what exactly do you mean by flattened version/structure here e.g.: 1. An Object with only primitive data types as attributes 2. An Object with no more than one level of other Objects as attributes 3. An Array/List of primitive types 4. An Array/List of Objects This question is in general about RDDs not necessarily RDDs in the context of SparkSQL When answering can you also score how bad the performance of each of the above options is -Original Message- From: Christian Perez [mailto:christ...@svds.com] Sent: Thursday, April 16, 2015 6:09 PM To: Michael Armbrust Cc: user Subject: Re: Super slow caching in 1.3? Hi Michael, Good question! We checked 1.2 and found that it is also slow cacheing the same flat parquet file. Caching other file formats of the same data were faster by up to a factor of ~2. Note that the parquet file was created in Impala but the other formats were written by Spark SQL. Cheers, Christian On Mon, Apr 6, 2015 at 6:17 PM, Michael Armbrust mich...@databricks.com wrote: Do you think you are seeing a regression from 1.2? Also, are you caching nested data or flat rows? The in-memory caching is not really designed for nested data and so performs pretty slowly here (its just falling back to kryo and even then there are some locking issues). If so, would it be possible to try caching a flattened version? CACHE TABLE flattenedTable AS SELECT ... FROM parquetTable On Mon, Apr 6, 2015 at 5:00 PM, Christian Perez christ...@svds.com wrote: Hi all, Has anyone else noticed very slow time to cache a Parquet file? It takes 14 s per 235 MB (1 block) uncompressed node local Parquet file on M2 EC2 instances. Or are my expectations way off... Cheers, Christian -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Christian Perez Silicon Valley Data Science Data Analyst christ...@svds.com @cp_phd - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: General configurations on CDH5 to achieve maximum Spark Performance
Thanks Evo. Yes, my concern is only regarding the infrastructure configurations. Basically, configuring Yarn (Node manager) + Spark is must and default setting never works. And what really happens, is we make changes as and when an issue is faced because of one of the numerous default configuration settings. And every time, we have to google a lot to decide on the right values :) Again, my issue is very centric to running Spark on Yarn in CDH5 environment. If you know a link that talks about optimum configuration settings for running Spark on Yarn (CDH5), please share the same. Thanks, Manish From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 10:38 PM To: Manish Gupta 8; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.commailto:mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.***
Re: [SQL] DROP TABLE should also uncache table
Can your code that can reproduce the problem? On Thu, Apr 16, 2015 at 5:42 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi As per JIRA this issue is resolved, but i am still facing this issue. SPARK-2734 - DROP TABLE should also uncache table -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Microsoft SQL jdbc support from spark sql
Bummer - out of curiosity, if you were to use the classpath.first or perhaps copy the jar to the slaves could that actually do the trick? The latter isn't really all that efficient but just curious if that could do the trick. On Thu, Apr 16, 2015 at 7:14 AM ARose ashley.r...@telarix.com wrote: I take it back. My solution only works when you set the master to local. I get the same error when I try to run it on the cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Microsoft SQL jdbc support from spark sql
I take it back. My solution only works when you set the master to local. I get the same error when I try to run it on the cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Microsoft-SQL-jdbc-support-from-spark-sql-tp22399p22525.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[ThriftServer] Urgent -- very slow Metastore query from Spark
Hi Sparkers, hoping for insight here: running a simple describe mytable here where mytable is a partitioned Hive table. Spark produces the following times: Query 1 of 1, Rows read: 50, Elapsed time (seconds) - Total: 73.02, SQL query: 72.831, Reading results: 0.189 Whereas Hive over the same metastore shows: Query 1 of 1, Rows read: 47, Elapsed time (seconds) - Total: 0.44, SQL query: 0.204, Reading results: 0.236 I am looking at the metastore as Thriftserver couldn't start up at all until I increased hive.metastore.client.socket.timeout to 600 Why would metastore access from Spark's Thriftserver be so much worse than from Hive? The issue is pretty urgent for me as I ran into this problem during a push to a production cluster (QA metastore table is smaller and it's a different cluster that didn't show this). Is there a known issue with metastore access -- I only see https://issues.apache.org/jira/browse/SPARK-5923 but I'm using Postgres. We are upgrading from Shark and both Hive and Shark process this a lot faster. Describe table in itself is not a critical query for me but I am experiencing performance hit in other queries and I'm suspecting the metastore interaction (e.g. https://www.mail-archive.com/user@spark.apache.org/msg26242.html)
RE: General configurations on CDH5 to achieve maximum Spark Performance
Essentially to change the performance yield of software cluster infrastructure platform like spark you play with different permutations of: - Number of CPU cores used by Spark Executors on every cluster node - Amount of RAM allocated for each executor How disks and network IO is used also plays a role but that is influenced more by app algorithmic aspects rather than YARN / Spark cluster config (except rack awreness etc) When Spark runs under the management of YARN the above is controlled / allocated by YARN https://spark.apache.org/docs/latest/running-on-yarn.html From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:21 PM To: Evo Eftimov; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Thanks Evo. Yes, my concern is only regarding the infrastructure configurations. Basically, configuring Yarn (Node manager) + Spark is must and default setting never works. And what really happens, is we make changes as and when an issue is faced because of one of the numerous default configuration settings. And every time, we have to google a lot to decide on the right values J Again, my issue is very centric to running Spark on Yarn in CDH5 environment. If you know a link that talks about optimum configuration settings for running Spark on Yarn (CDH5), please share the same. Thanks, Manish From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 10:38 PM To: Manish Gupta 8; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine .. Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.***
saveAsTextFile
I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ
Re: saveAsTextFile
Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6 ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
RE: saveAsTextFile
Nop Sir, it is possible - check my reply earlier -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 6:35 PM To: Vadim Bichutskiy Cc: user@spark.apache.org Subject: Re: saveAsTextFile You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
Basically you need to unbundle the elements of the RDD and then store them wherever you want - Use foreacPartition and then foreach -Original Message- From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:39 PM To: Sean Owen Cc: user@spark.apache.org Subject: Re: saveAsTextFile Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Just copy the files? it shouldn't matter that much where they are as you can find them easily. Or consider somehow sending the batches of data straight into Redshift? no idea how that is done but I imagine it's doable. On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: saveAsTextFile
Also to juggle even further the multithreading model of both spark and HDFS you can even publish the data from spark first to a message broker e.g. kafka from where a predetermined number (from 1 to infinity) of parallel consumers will retrieve and store in HDFS in one or more finely controlled files and directories From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:45 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: saveAsTextFile Thanks Evo for your detailed explanation. On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote: The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=057349bb-29a2-4296-82b7-c52b46ae19f6 ᐧ http://t.signauxcinq.com/e1t/o/5/f18dQhb0S7ks8dDMPbW2n0x6l2B9gXrN7sKj6v5dsrxW7gbZX-8q-6ZdVdnPvF2zlZNzW3hF9wD1k1H6H0?si=5533377798602752pi=ff283f35-99c4-4b15-dd07-91df78970bf8
Re: saveAsTextFile
Copy should be doable but I'm not sure how to specify a prefix for the directory while keeping the filename (ie part-0) fixed in copy command. On Apr 16, 2015, at 1:51 PM, Sean Owen so...@cloudera.com wrote: Just copy the files? it shouldn't matter that much where they are as you can find them easily. Or consider somehow sending the batches of data straight into Redshift? no idea how that is done but I imagine it's doable. On Thu, Apr 16, 2015 at 6:38 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks Sean. I want to load each batch into Redshift. What's the best/most efficient way to do that? Vadim On Apr 16, 2015, at 1:35 PM, Sean Owen so...@cloudera.com wrote: You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Random pairs / RDD order
Hi everyone, I have a large RDD and I am trying to create a RDD of a random sample of pairs of elements from this RDD. The elements composing a pair should come from the same partition for efficiency. The idea I've come up with is to take two random samples and then use zipPartitions to pair each i-th element of the first sample with the i-th element of the second sample. Here is a sample code illustrating the idea: --- val rdd = sc.parallelize(1 to 6, 16) val sample1 = rdd.sample(true,0.01,42) val sample2 = rdd.sample(true,0.01,43) def myfunc(s1: Iterator[Int], s2: Iterator[Int]): Iterator[String] = { var res = List[String]() while (s1.hasNext s2.hasNext) { val x = s1.next + + s2.next res ::= x } res.iterator } val pairs = sample1.zipPartitions(sample2)(myfunc) - However I am not happy with this solution because each element is most likely to be paired with elements that are closeby in the partition. This is because sample returns an ordered Iterator. Any idea how to fix this? I did not find a way to efficiently shuffle the random sample so far. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Random-pairs-RDD-order-tp22529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: General configurations on CDH5 to achieve maximum Spark Performance
I don't think there's anything specific to CDH that you need to know, other than it ought to set things up sanely for you. Sandy did a couple posts about tuning: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-1/ http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ I don't think there's such a thing as one optimal configuration. It depends very heavily on your workload. First you need to have a look at your app, really. All the tuning in the world isn't going to make an unnecessary shuffle as fast as eliminating it. On Thu, Apr 16, 2015 at 6:02 PM, Manish Gupta 8 mgupt...@sapient.com wrote: Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any (your) computer. ***Please consider the environment before printing this email.*** - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Distinct is very slow
No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding any GC time etc. Thanks Best Regards On Thu, Apr 16, 2015 at 9:56 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have below code whether distinct is running for more time. blockingRdd is the combination
Re: saveAsTextFile
You can't, since that's how it's designed to work. Batches are saved in different files, which are really directories containing partitions, as is common in Hadoop. You can move them later, or just read them where they are. On Thu, Apr 16, 2015 at 6:32 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Windows
You could build Spark with Scala 2.11 on Mac / Linux and transfer it over to Windows. AFAIK it should build on Windows too, the only problem is that Maven might take a long time to download dependencies. What errors are you seeing? Matei On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote: We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: saveAsTextFile
Thanks Evo for your detailed explanation. On Apr 16, 2015, at 1:38 PM, Evo Eftimov evo.efti...@isecc.com wrote: The reason for this is as follows: 1. You are saving data on HDFS 2. HDFS as a cluster/server side Service has a Single Writer / Multiple Reader multithreading model 3. Hence each thread of execution in Spark has to write to a separate file in HDFS 4. Moreover the RDDs are partitioned across cluster nodes and operated upon by multiple threads there and on top of that in Spark Streaming you have many micro-batch RDDs streaming in all the time as part of a DStream If you want fine / detailed management of the writing to HDFS you can implement your own HDFS adapter and invoke it in forEachRDD and foreach Regards Evo Eftimov From: Vadim Bichutskiy [mailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 16, 2015 6:33 PM To: user@spark.apache.org Subject: saveAsTextFile I am using Spark Streaming where during each micro-batch I output data to S3 using saveAsTextFile. Right now each batch of data is put into its own directory containing 2 objects, _SUCCESS and part-0. How do I output each batch into a common directory? Thanks, Vadim ᐧ
Re: spark.dynamicAllocation.minExecutors
On Thu, Apr 16, 2015 at 12:16:13PM -0700, Marcelo Vanzin wrote: I think Michael is referring to this: Exception in thread main java.lang.IllegalArgumentException: You must specify at least 1 executor! Usage: org.apache.spark.deploy.yarn.Client [options] Yes, sorry, there were too many mins and maxs and I copied the wrong line. Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SQL] DROP TABLE should also uncache table
Oh, just noticed that I missed attach... Yeah, your scripts will be helpful. Thanks! On Thu, Apr 16, 2015 at 12:03 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Yes, i am able to reproduce the problem. Do you need the scripts to create the tables? On Thu, Apr 16, 2015 at 10:50 PM, Yin Huai yh...@databricks.com wrote: Can your code that can reproduce the problem? On Thu, Apr 16, 2015 at 5:42 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi As per JIRA this issue is resolved, but i am still facing this issue. SPARK-2734 - DROP TABLE should also uncache table -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
dataframe call, how to control number of tasks for a stage
I have some trouble to control number of spark tasks for a stage. This on latest spark 1.3.x source code build. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) sc.getConf.get(spark.default.parallelism) - setup to 10 val t1 = hiveContext.sql(FROM SalesJan2009 select * ) val t2 = t1.groupBy(country, state, city).agg(avg(price).as(aprive)) t1.rdd.partitions.size - got 2 t2.rdd.partitions.size - got 200 First questions, why does t2's partition size becomes 200? Second questions, even if I do t2.repartition(10).collect, in some stages, it still fires 200 tasks. Thanks, -Neal
Re: spark.dynamicAllocation.minExecutors
I think Michael is referring to this: Exception in thread main java.lang.IllegalArgumentException: You must specify at least 1 executor! Usage: org.apache.spark.deploy.yarn.Client [options] spark-submit --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.minExecutors=0 ... On Thu, Apr 16, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote: Yes, look what it was before -- would also reject a minimum of 0. That's the case you are hitting. 0 is a fine minimum. On Thu, Apr 16, 2015 at 8:09 PM, Michael Stone mst...@mathom.us wrote: On Thu, Apr 16, 2015 at 07:47:51PM +0100, Sean Owen wrote: IIRC that was fixed already in 1.3 https://github.com/apache/spark/commit/b2047b55c5fc85de6b63276d8ab9610d2496e08b From that commit: + private val minNumExecutors = conf.getInt(spark.dynamicAllocation.minExecutors, 0) ... + if (maxNumExecutors == 0) { + throw new SparkException(spark.dynamicAllocation.maxExecutors cannot be 0!) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.dynamicAllocation.minExecutors
On Thu, Apr 16, 2015 at 07:47:51PM +0100, Sean Owen wrote: IIRC that was fixed already in 1.3 https://github.com/apache/spark/commit/b2047b55c5fc85de6b63276d8ab9610d2496e08b From that commit: + private val minNumExecutors = conf.getInt(spark.dynamicAllocation.minExecutors, 0) ... + if (maxNumExecutors == 0) { + throw new SparkException(spark.dynamicAllocation.maxExecutors cannot be 0!) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Windows
Thanks, Matei! We'll try that and let you know if it works. You are correct in inferring that some of the problems we had were with dependencies. We also had problems with the spark-submit scripts. I will get the details from the engineer who worked on the Windows builds and provide them to you. arun On Thu, Apr 16, 2015 at 10:44 AM, Matei Zaharia matei.zaha...@gmail.com wrote: You could build Spark with Scala 2.11 on Mac / Linux and transfer it over to Windows. AFAIK it should build on Windows too, the only problem is that Maven might take a long time to download dependencies. What errors are you seeing? Matei On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote: We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun
Re: Re: spark streaming printing no output
empty folders generally means that you need to just increase the window intervals; i.e. spark streaming saveAsTxtFiles will save folders for each interval regardless On Wed, Apr 15, 2015 at 5:03 AM, Shushant Arora shushantaror...@gmail.com wrote: Its printing on console but on HDFS all folders are still empty . On Wed, Apr 15, 2015 at 2:29 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks !! Yes message types on this console is seen on another console. When I closed another console, spark streaming job is printing messages on console . Isn't the message written on a port using netcat be avaible for multiple consumers? On Wed, Apr 15, 2015 at 2:22 PM, bit1...@163.com bit1...@163.com wrote: Looks the message is consumed by the another console?( can see messages typed on this port from another console.) -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-04-15 17:11 *To:* Akhil Das ak...@sigmoidanalytics.com *CC:* user@spark.apache.org *Subject:* Re: spark streaming printing no output When I launched spark-shell using, spark-shell ---master local[2]. Same behaviour, no output on console but only timestamps. When I did, lines.saveAsTextFiles(hdfslocation,suffix); I get empty files of 0 bytes on hdfs On Wed, Apr 15, 2015 at 12:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Just make sure you have atleast 2 cores available for processing. You can try launching it in local[2] and make sure its working fine. Thanks Best Regards On Tue, Apr 14, 2015 at 11:41 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am running a spark streaming application but on console nothing is getting printed. I am doing 1.bin/spark-shell --master clusterMgrUrl 2.import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.Duration import org.apache.spark.streaming.Seconds val ssc = new StreamingContext( sc, Seconds(1)) val lines = ssc.socketTextStream(hostname,) lines.print() ssc.start() ssc.awaitTermination() Jobs are getting created when I see webUI but nothing gets printed on console. I have started a nc script on hostname port and can see messages typed on this port from another console. Please let me know If I am doing something wrong. -- jay vyas
Re: Problem with Spark SQL UserDefinedType and sbt assembly
If it fails with sbt-assembly but not without it, then there's always the likelihood of a classpath issue. What dependencies are you rolling up into your assembly jar? On Thu, Apr 16, 2015 at 4:46 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Any ideas ? On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
Re: spark.dynamicAllocation.minExecutors
On Thu, Apr 16, 2015 at 08:10:54PM +0100, Sean Owen wrote: Yes, look what it was before -- would also reject a minimum of 0. That's the case you are hitting. 0 is a fine minimum. How can 0 be a fine minimum if it's rejected? Changing the value is easy enough, but in general it's nice for defaults to make sense. Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Distinct is very slow
Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm assuming your 400k records isn't bigger than a 10G dataset. Thanks Best Regards On Thu, Apr 16, 2015 at 10:00 PM, Jeetendra Gangele gangele...@gmail.com wrote: I already checked and G is taking 1 secs for each task. is this too much? if yes how to avoid this? On 16 April 2015 at 21:58, Akhil Das ak...@sigmoidanalytics.com wrote: Open the driver ui and see which stage is taking time, you can look whether its adding
Re: dataframe can not find fields after loading from hive
Never mind. I found the solution: val newDataFrame = hc.createDataFrame(hiveLoadedDataFrame.rdd, hiveLoadedDataFrame.schema) which translate to convert the data frame to rdd and back again to data frame. Not the prettiest solution, but at least it solves my problems. Thanks, Cesar Flores On Thu, Apr 16, 2015 at 11:17 AM, Cesar Flores ces...@gmail.com wrote: I have a data frame in which I load data from a hive table. And my issue is that the data frame is missing the columns that I need to query. For example: val newdataset = dataset.where(dataset(label) === 1) gives me an error like the following: ERROR yarn.ApplicationMaster: User class threw exception: resolved attributes label missing from label, user_id, ...(the rest of the fields of my table org.apache.spark.sql.AnalysisException: resolved attributes label missing from label, user_id, ... (the rest of the fields of my table) where we can see that the label field actually exist. I manage to solve this issue by updating my syntax to: val newdataset = dataset.where($label === 1) which works. However I can not make this trick in all my queries. For example, when I try to do a unionAll from two subsets of the same data frame the error I am getting is that all my fields are missing. Can someone tell me if I need to do some post processing after loading from hive in order to avoid this kind of errors? Thanks -- Cesar Flores -- Cesar Flores
Re: Problem with Spark SQL UserDefinedType and sbt assembly
Any ideas ? On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
Re: How to join RDD keyValuePairs efficiently
Does this same functionality exist with Java? On 17 April 2015 at 02:23, Evo Eftimov evo.efti...@isecc.com wrote: You can use def partitionBy(partitioner: Partitioner): RDD[(K, V)] Return a copy of the RDD partitioned using the specified partitioner The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is something which adds valuable functionality to spark e.g. the point lookups PROVIDED it can be executed from within function running on worker executors Can somebody from DataBricks sched more light here -Original Message- From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: Thursday, April 16, 2015 9:39 PM To: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Evo partition the large doc RDD based on the hash function on the key ie the docid What API to use to do this? By the way, loading the entire dataset to memory cause OutOfMemory problem because it is too large (I only have one machine with 16GB and 4 cores). I found something called IndexedRDD on the web https://github.com/amplab/spark-indexedrdd Has anybody use it? Ningjun -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 12:18 PM To: 'Sean Owen'; Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(/temp/allDocs.obj) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (/temp/allDocs.obj) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in /temp/allDocs.obj sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- T ususcib, -mil uerunubcrbesprkapch.og Fo adiioalcomads emal:usr...@sar.aace.rg - To unsubscribe, e-mail:
Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs
i believe it is a generalization of some classes inside graphx, where there was/is a need to keep stuff indexed for random access within each rdd partition On Thu, Apr 16, 2015 at 5:00 PM, Evo Eftimov evo.efti...@isecc.com wrote: Can somebody from Data Briks sched more light on this Indexed RDD library https://github.com/amplab/spark-indexedrdd It seems to come from AMP Labs and most of the Data Bricks guys are from there What is especially interesting is whether the Point Lookup (and the other primitives) can work from within a function (e.g. map) running on executors on worker nodes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/AMP-Lab-Indexed-RDD-question-for-Data-Bricks-AMP-Labs-tp22532.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ClassCastException processing date fields using spark SQL since 1.3.0
Filed: https://issues.apache.org/jira/browse/SPARK-6967 Shouldn't they be null? Statistics are only used to eliminate partitions that can't possibly hold matching values. So while you are right this might result in a false positive, that will not result in a wrong answer.
Re: spark.dynamicAllocation.minExecutors
Yes, look what it was before -- would also reject a minimum of 0. That's the case you are hitting. 0 is a fine minimum. On Thu, Apr 16, 2015 at 8:09 PM, Michael Stone mst...@mathom.us wrote: On Thu, Apr 16, 2015 at 07:47:51PM +0100, Sean Owen wrote: IIRC that was fixed already in 1.3 https://github.com/apache/spark/commit/b2047b55c5fc85de6b63276d8ab9610d2496e08b From that commit: + private val minNumExecutors = conf.getInt(spark.dynamicAllocation.minExecutors, 0) ... + if (maxNumExecutors == 0) { + throw new SparkException(spark.dynamicAllocation.maxExecutors cannot be 0!) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.dynamicAllocation.minExecutors
Looks like that message would be triggered if spark.dynamicAllocation.initialExecutors was not set, or 0, if I read this right. Yeah, that might have to be positive. This requires you set initial executors to 1 if you want 0 min executors. Hm, maybe that shouldn't be an error condition in the args parser. I could go either way on that, myself. On Thu, Apr 16, 2015 at 8:17 PM, Michael Stone mst...@mathom.us wrote: On Thu, Apr 16, 2015 at 12:16:13PM -0700, Marcelo Vanzin wrote: I think Michael is referring to this: Exception in thread main java.lang.IllegalArgumentException: You must specify at least 1 executor! Usage: org.apache.spark.deploy.yarn.Client [options] Yes, sorry, there were too many mins and maxs and I copied the wrong line. Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL query key/value in Map
For Map type column, fields['driver'] is the syntax to retrieve the map value (in the schema, you can see fields: map). The syntax of fields.driver is used for struct type. On Thu, Apr 16, 2015 at 12:37 AM, jc.francisco jc.francisc...@gmail.com wrote: Hi, I'm new with both Cassandra and Spark and am experimenting with what Spark SQL can do as it will affect my Cassandra data model. What I need is a model that can accept arbitrary fields, similar to Postgres's Hstore. Right now, I'm trying out the map type in Cassandra but I'm getting the exception below when running my Spark SQL: java.lang.RuntimeException: Can't access nested field in type MapType(StringType,StringType,true) The schema I have now is: root |-- device_id: integer (nullable = true) |-- event_date: string (nullable = true) |-- fields: map (nullable = true) ||-- key: string ||-- value: string (valueContainsNull = true) And my Spark SQL is: SELECT fields from raw_device_data where fields.driver = 'driver1' From what I gather, this should work for a JSON based RDD ( https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html ). Is this not supported for a Cassandra map type? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-query-key-value-in-Map-tp22517.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Timeout errors from Akka in Spark 1.2.1
Hi Guillaume, Interesting that you brought up Shuffle. In fact we are experiencing this issue of shuffle files being left behind and not being cleaned up. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files being left is a big problem right now. Since our max window size is 6 hours, we have set a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space. Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following issues had been opened but were closed as Duplicates of the above Documentation bug. https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into how to handle shuffle files will be greatly appreciated. Thanks NB On Fri, Apr 10, 2015 at 12:33 AM, Guillaume Pitel guillaume.pi...@exensa.com wrote: Hi, From my experience, the main causes of timeout are related to file cleanup, for instance after a shuffle. If your parallelism is very high and you didn't set the consolidate option, there are many files created by Spark, and when they are cleaned up, the calls starts timeouting. So you may find your solution by monitoring the folder where spark store its shuffles. Guillaume Thanks TD. I believe that might have been the issue. Will try for a few days after passing in the GC option on the java command line when we start the process. Thanks for your timely help. NB On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das t...@databricks.com wrote: Yes, in local mode they the driver and executor will be same the process. And in that case the Java options in SparkConf configuration will not work. On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote: Since we are running in local mode, won't all the executors be in the same JVM as the driver? Thanks NB On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote: Its does take effect on the executors, not on the driver. Which is okay because executors have all the data and therefore have GC issues, not so usually for the driver. If you want to double-sure, print the JVM flag (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags) However, the GC i was referring to that initiates the RDD and shuffle cleanup was the GC on the driver. Thought I would clarify. TD On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote: Hi TD, Thanks for the response. Since you mentioned GC, this got me thinking. Given that we are running in local mode (all in a single JVM) for now, does the option spark.executor.extraJavaOptions set to -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before we use it to create the StreamingContext? I ask because that is what we are doing right now. If not, perhaps we have not been running with the Concurrent Mark Sweep at all and is that recommended instead of forcing GC periodically? Thanks NB On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com wrote: There are a couple of options. Increase timeout (see Spark configuration). Also see past mails in the mailing list. Another option you may try (I have gut feeling that may work, but I am not sure) is calling GC on the driver periodically. The cleaning up of stuff is tied to GCing of RDD objects and regular cleaning may help keep things clean more rigorously rather than in unpredictable bursts of GC activity. Let us know how it works out. TD On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote: I have a standalone and local Spark streaming process where we are reading inputs using FlumeUtils. Our longest window size is 6 hours. After about a day and a half of running without any issues, we start seeing Timeout errors while cleaning up input blocks. This seems to cause reading from Flume to cease. ERROR sparkDriver-akka.actor.default-dispatcher-78 BlockManagerSlaveActor.logError - Error in removing block input-0-1428182594000 org.apache.spark.SparkException: Error sending message [message = UpdateBlockInfo(BlockManagerId(driver, localhost, 55067),input-0-1428182594000,StorageLevel(false, false, false, false, 1),0,0,0)] at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) at org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) at
Re: [SQL] DROP TABLE should also uncache table
Yes, i am able to reproduce the problem. Do you need the scripts to create the tables? On Thu, Apr 16, 2015 at 10:50 PM, Yin Huai yh...@databricks.com wrote: Can your code that can reproduce the problem? On Thu, Apr 16, 2015 at 5:42 AM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: Hi As per JIRA this issue is resolved, but i am still facing this issue. SPARK-2734 - DROP TABLE should also uncache table -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
When querying ElasticSearch, score is 0
Hi, I have data in my ElasticSearch server, when I query it using rest interface, I get results and score for each result, but when I run the same query in spark using ElasticSearch API, I get results and meta data, but the score is shown 0 for each record. My configuration is ... val conf = new SparkConf() .setMaster(local[6]) .setAppName(DBpedia to ElasticSearch) .set(es.index.auto.create, true) .set(es.field.read.empty.as.null,true) .set(es.read.metadata,true) ... val sc = new SparkContext(conf) val test= Map(query-{\n\query\:{\n \fuzzy_like_this\ : {\n \fields\ : [\label\],\n \like_text\ : \102nd Ohio Infantry\ }\n } \n}) val mYRDD = sc.esRDD(dbpedia/docs,test.get(query).get) Sample output: Map(id - http://dbpedia.org/resource/Alert,_Ohio;, label - Alert, Ohio, category - Unincorporated communities in Ohio, abstract - Alert is an unincorporated community in southern Morgan Township, Butler County, Ohio, in the United States. It is located about ten miles southwest of Hamilton on Howards Creek, a tributary of the Great Miami River in section 28 of R1ET3N of the Congress Lands. It is three miles west of Shandon and two miles south of Okeana., _metadata - Map(_index - dbpedia, _type - docs, _id - AUy5aQs7895C6HE5GmG4, _score - 0.0)) As you can see _score is 0. Would appreciate any help, Cheers, Andrejs
RE: How to join RDD keyValuePairs efficiently
You can use def partitionBy(partitioner: Partitioner): RDD[(K, V)] Return a copy of the RDD partitioned using the specified partitioner The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is something which adds valuable functionality to spark e.g. the point lookups PROVIDED it can be executed from within function running on worker executors Can somebody from DataBricks sched more light here -Original Message- From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: Thursday, April 16, 2015 9:39 PM To: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Evo partition the large doc RDD based on the hash function on the key ie the docid What API to use to do this? By the way, loading the entire dataset to memory cause OutOfMemory problem because it is too large (I only have one machine with 16GB and 4 cores). I found something called IndexedRDD on the web https://github.com/amplab/spark-indexedrdd Has anybody use it? Ningjun -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 12:18 PM To: 'Sean Owen'; Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- T ususcib, -mil uerunubcrbesprkapch.og Fo adiioalcomads emal:usr...@sar.aace.rg - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: regarding ZipWithIndex
Can you please guide me how can I extend RDD and convert into this way you are suggesting. On 16 April 2015 at 23:46, Jeetendra Gangele gangele...@gmail.com wrote: I type T i already have Object ... I have RDDObject and then I am calling ZipWithIndex on this RDD and getting RDDObject,Long on this I am running MapToPair and converting into RDDLong,Object so that i can use it later for other operation like lookup and join. On 16 April 2015 at 23:42, Ted Yu yuzhih...@gmail.com wrote: The Long in RDD[(T, Long)] is type parameter. You can create RDD with Integer as the first type parameter. Cheers On Thu, Apr 16, 2015 at 11:07 AM, Jeetendra Gangele gangele...@gmail.com wrote: Hi Ted. This works for me. But since Long takes here 8 bytes. Can I reduce it to 4 bytes. its just a index and I feel 4 bytes was more than enough.is there any method which takes Integer or similar for Index? On 13 April 2015 at 01:59, Ted Yu yuzhih...@gmail.com wrote: bq. will return something like JavaPairRDDObject, long The long component of the pair fits your description of index. What other requirement does ZipWithIndex not provide you ? Cheers On Sun, Apr 12, 2015 at 1:16 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have an RDD JavaRDDObject and I want to convert it to JavaPairRDDIndex,Object.. Index should be unique and it should maintain the order. For first object It should have 1 and then for second 2 like that. I tried using ZipWithIndex but it will return something like JavaPairRDDObject, long I wanted to use this RDD for lookup and join operation later in my workflow so ordering is important. Regards jeet
AMP Lab Indexed RDD - question for Data Bricks AMP Labs
Can somebody from Data Briks sched more light on this Indexed RDD library https://github.com/amplab/spark-indexedrdd It seems to come from AMP Labs and most of the Data Bricks guys are from there What is especially interesting is whether the Point Lookup (and the other primitives) can work from within a function (e.g. map) running on executors on worker nodes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/AMP-Lab-Indexed-RDD-question-for-Data-Bricks-AMP-Labs-tp22532.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: AMP Lab Indexed RDD - question for Data Bricks AMP Labs
Thanks but we need a firm statement and preferably from somebody from the spark vendor Data Bricks including answer to the specific question posed by me and assessment/confirmation whether this is a production ready / quality library which can be used for general purpose RDDs not just inside the context of graphx From: Koert Kuipers [mailto:ko...@tresata.com] Sent: Thursday, April 16, 2015 10:31 PM To: Evo Eftimov Cc: user@spark.apache.org Subject: Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs i believe it is a generalization of some classes inside graphx, where there was/is a need to keep stuff indexed for random access within each rdd partition On Thu, Apr 16, 2015 at 5:00 PM, Evo Eftimov evo.efti...@isecc.com wrote: Can somebody from Data Briks sched more light on this Indexed RDD library https://github.com/amplab/spark-indexedrdd It seems to come from AMP Labs and most of the Data Bricks guys are from there What is especially interesting is whether the Point Lookup (and the other primitives) can work from within a function (e.g. map) running on executors on worker nodes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/AMP-Lab-Indexed-RDD-question-for-Data-Bricks-AMP-Labs-tp22532.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib - Naive Bayes Problem
I have a big dataset of categories of cars and descriptions of cars. So i want to give a description of a car and the program to classify the category of that car. So i decided to use multinomial naive Bayes. I created a unique id for each word and replaced my whole category,description data. //My input 2,25187 15095 22608 28756 17862 29523 499 32681 9830 24957 18993 19501 16596 17953 16596 20,1846 29058 16252 20446 9835 52,16861 808 26785 17874 18993 18993 18993 18269 34157 33811 18437 6004 2791 27923 19141 ... ... Why do I have errors like: //Errors 3 ERROR Executor: Exception in task 0.0 in stage 211.0 (TID 392) java.lang.IndexOutOfBoundsException: 13 not in [-13,13) ERROR Executor: Exception in task 1.0 in stage 211.0 (TID 393) java.lang.IndexOutOfBoundsException: 17 not in [-17,17) ERROR TaskSetManager: Task 0 in stage 211.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 211.0 failed 1 times, most recent failure: Lost task 0.0 in stage 211.0 (TID 392, localhost): java.lang.IndexOutOfBoundsException: 13 not in [-13,13) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-Problem-tp22531.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to join RDD keyValuePairs efficiently
Evo partition the large doc RDD based on the hash function on the key ie the docid What API to use to do this? By the way, loading the entire dataset to memory cause OutOfMemory problem because it is too large (I only have one machine with 16GB and 4 cores). I found something called IndexedRDD on the web https://github.com/amplab/spark-indexedrdd Has anybody use it? Ningjun -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 12:18 PM To: 'Sean Owen'; Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(“/temp/allDocs.obj”) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (“/temp/allDocs.obj”) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in “/temp/allDocs.obj” sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem with Spark SQL UserDefinedType and sbt assembly
Here is the list of my dependencies : *libraryDependencies ++= Seq(* * org.apache.spark %% spark-core % sparkVersion % provided, org.apache.spark %% spark-sql % sparkVersion, org.apache.spark %% spark-mllib % sparkVersion, org.iq80.leveldb % leveldb % 0.7, com.github.fommil.netlib % all % 1.1.2 pomOnly(), com.github.scopt %% scopt % 3.2.0, org.bytedeco.javacpp-presets % opencv % 2.4.11-0.11-SNAPSHOT classifier linux-x86_64 classifier , org.bytedeco.javacpp-presets % caffe % master-0.11-SNAPSHOT classifier linux-x86_64 classifier , org.bytedeco % javacpp % 0.11-SNAPSHOT, org.scalatest % scalatest_2.10 % 2.2.0 % test)* On Thu, Apr 16, 2015 at 11:16 PM, Richard Marscher rmarsc...@localytics.com wrote: If it fails with sbt-assembly but not without it, then there's always the likelihood of a classpath issue. What dependencies are you rolling up into your assembly jar? On Thu, Apr 16, 2015 at 4:46 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Any ideas ? On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, Here is an issue that gets me mad. I wrote a UserDefineType in order to be able to store a custom type in a parquet file. In my code I just create a DataFrame with my custom data type and write in into a parquet file. When I run my code directly inside idea every thing works like a charm. But when I create the assembly jar with sbt assembly and run the same code with spark-submit I get the following error : *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)* *java.lang.IllegalArgumentException: Unsupported dataType: {type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}, [1.1] failure: `TimestampType' expected but `{' found* *{type:struct,fields:[{name:metadata,type:{type:udt,class:org.apache.spark.vision.types.ImageMetadataUDT,pyClass:null,sqlType:{type:struct,fields:[{name:name,type:string,nullable:true,metadata:{}},{name:encoding,type:string,nullable:true,metadata:{}},{name:cameraId,type:string,nullable:true,metadata:{}},{name:timestamp,type:string,nullable:true,metadata:{}},{name:frameId,type:string,nullable:true,metadata:{}}]}},nullable:true,metadata:{}}]}* *^* *at org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)* *at org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)* *at scala.util.Try.getOrElse(Try.scala:77)* *at org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)* *at org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)* *at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)* *at org.apache.spark.sql.parquet.ParquetRelation2.org http://org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)* *at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)* *at org.apache.spark.scheduler.Task.run(Task.scala:64)* *at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)* *at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)* *at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)* *at java.lang.Thread.run(Thread.java:745)*
RE: How to join RDD keyValuePairs efficiently
Yes simply look for partitionby in the javadoc for e.g. PairJavaRDD From: Jeetendra Gangele [mailto:gangele...@gmail.com] Sent: Thursday, April 16, 2015 9:57 PM To: Evo Eftimov Cc: Wang, Ningjun (LNG-NPV); user Subject: Re: How to join RDD keyValuePairs efficiently Does this same functionality exist with Java? On 17 April 2015 at 02:23, Evo Eftimov evo.efti...@isecc.com wrote: You can use def partitionBy(partitioner: Partitioner): RDD[(K, V)] Return a copy of the RDD partitioned using the specified partitioner The https://github.com/amplab/spark-indexedrdd stuff looks pretty cool and is something which adds valuable functionality to spark e.g. the point lookups PROVIDED it can be executed from within function running on worker executors Can somebody from DataBricks sched more light here -Original Message- From: Wang, Ningjun (LNG-NPV) [mailto:ningjun.w...@lexisnexis.com] Sent: Thursday, April 16, 2015 9:39 PM To: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Evo partition the large doc RDD based on the hash function on the key ie the docid What API to use to do this? By the way, loading the entire dataset to memory cause OutOfMemory problem because it is too large (I only have one machine with 16GB and 4 cores). I found something called IndexedRDD on the web https://github.com/amplab/spark-indexedrdd Has anybody use it? Ningjun -Original Message- From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 12:18 PM To: 'Sean Owen'; Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: RE: How to join RDD keyValuePairs efficiently Ningjun, to speed up your current design you can do the following: 1.partition the large doc RDD based on the hash function on the key ie the docid 2. persist the large dataset in memory to be available for subsequent queries without reloading and repartitioning for every search query 3. partition the small doc dataset in the same way - this will result in collocated small and large RDD partitions with the same key 4. run the join - the match is not going to be sequential it is based on hash of the key moreover RDD elements with the same key will be collocated on the same cluster node OR simply go for Sean suggestion - under the hood it works in a slightly different way - the filter is executed in mappers running in parallel on every node and also by passing the small doc IDs to each filter (mapper) you essentially replicate them on every node so each mapper instance has its own copy and runs with it when filtering And finally you can prototype both options described above and measure and compare their performance -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Thursday, April 16, 2015 5:02 PM To: Wang, Ningjun (LNG-NPV) Cc: user@spark.apache.org Subject: Re: How to join RDD keyValuePairs efficiently This would be much, much faster if your set of IDs was simply a Set, and you passed that to a filter() call that just filtered in the docs that matched an ID in the set. On Thu, Apr 16, 2015 at 4:51 PM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: Does anybody have a solution for this? From: Wang, Ningjun (LNG-NPV) Sent: Tuesday, April 14, 2015 10:41 AM To: user@spark.apache.org Subject: How to join RDD keyValuePairs efficiently I have an RDD that contains millions of Document objects. Each document has an unique Id that is a string. I need to find the documents by ids quickly. Currently I used RDD join as follow First I save the RDD as object file allDocs : RDD[Document] = getDocs() // this RDD contains 7 million Document objects allDocs.saveAsObjectFile(/temp/allDocs.obj) Then I wrote a function to find documents by Ids def findDocumentsByIds(docids: RDD[String]) = { // docids contains less than 100 item val allDocs : RDD[Document] =sc.objectFile[Document]( (/temp/allDocs.obj) val idAndDocs = allDocs.keyBy(d = dv.id) docids.map(id = (id,id)).join(idAndDocs).map(t = t._2._2) } I found that this is very slow. I suspect it scan the entire 7 million Document objects in /temp/allDocs.obj sequentially to find the desired document. Is there any efficient way to do this? One option I am thinking is that instead of storing the RDD[Document] as object file, I store each document in a separate file with filename equal to the docid. This way I can find a document quickly by docid. However this means I need to save the RDD to 7 million small file which will take a very long time to save and may cause IO problems with so many small files. Is there any other way? Ningjun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- T ususcib, -mil uerunubcrbesprkapch.og Fo
mapPartitions() in Java 8
Hi , how to convert this script to java 8 with lampdas ? My problem is the function getactivations() returns a scala.collection.IteratorNode to mapPartitions() that need a java.util.IteratorString ... Thks ! === // Step 1 - Stub code to copy into Spark Shell // load XML files containing device activation records. // Find the most common device models activated import scala.xml._ // Given a partition containing multi-line XML, parse the contents. // Return an iterator of activation XML nodes contained in the partition def getactivations(fileiterator: Iterator[String]): Iterator[Node] = { val nodes = XML.loadString(fileiterator.mkString) \\ activation nodes.toIterator } // Get the model name from a device activation record def getmodel(activation: Node): String = { (activation \ model).text } // Step 2 - activation XML files var filename=hdfs://localhost/user/cloudera/data/activations/* // parse each partition as a file into an activation XML record var activations = sc.textFile(filename) // Step 3 - Parse each partition as a file into an activation XML record var activationTrees = activations.mapPartitions(getactivations) // Step 4 - Map each activation record to a device model name var models = activationTrees.map(getmodel) // Step 5 - Show the partitioning println(models.toDebugString) // Step 6 - XCount activations by model var modelcounts = models. map(model = (model,1)). reduceByKey((v1,v2) = v1+v2) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-in-Java-8-tp22533.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Random pairs / RDD order
Hi Aurelien, Sean's solution is nice, but maybe not completely order-free, since pairs will come from the same partition. The easiest / fastest way to do it in my opinion is to use a random key instead of a zipWithIndex. Of course you'll not be able to ensure uniqueness of each elements of the pairs, but maybe you don't care since you're sampling with replacement already? val a = rdd.sample(...).map{ x = (rand() % k, x)} val b = rdd.sample(...).map{ x = (rand() % k, x)} k must be ~ the number of elements you're sampling. You'll have a skewed distribution due to collisions, but I don't think it should hurt too much. Guillaume Hi everyone, However I am not happy with this solution because each element is most likely to be paired with elements that are closeby in the partition. This is because sample returns an ordered Iterator. -- eXenSa *Guillaume PITEL, Président* +33(0)626 222 431 eXenSa S.A.S. http://www.exensa.com/ 41, rue Périer - 92120 Montrouge - FRANCE Tel +33(0)184 163 677 / Fax +33(0)972 283 705
Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs
I'm the primary author of IndexedRDD. To answer your questions: 1. Operations on an IndexedRDD partition can only be performed from a task operating on that partition, since doing otherwise would require decentralized coordination between workers, which is difficult in Spark. If you want to perform cross-partition lookups, you'll have to do all the lookups in a batch step as follows: val a = IndexedRDD(...) val b = sc.parallelize(...) // Perform an operation on b that produces some keys to look up in a val lookups: RDD[Long] = b.map(...) // Repartition the desired keys to their appropriate partitions in a and do local lookups, returning the corresponding values val results = a.innerJoin(b.map(k = (k, ( { (id, v, unit) = v } 2. IndexedRDD originated from GraphX but can be used for general operations as long as they fit within Spark's batch-oriented programming model. By the way, a new version of IndexedRDD is about to be released. If you decide to use IndexedRDD I'd suggest trying that out, since it provides a cleaner interface, more predictable performance, and support for arbitrary key types: https://github.com/amplab/spark-indexedrdd/pull/4 Ankur http://www.ankurdave.com/ On Thu, Apr 16, 2015 at 2:34 PM, Evo Eftimov evo.efti...@isecc.com wrote: Thanks but we need a firm statement and preferably from somebody from the spark vendor Data Bricks including answer to the specific question posed by me and assessment/confirmation whether this is a production ready / quality library which can be used for general purpose RDDs not just inside the context of graphx *From:* Koert Kuipers [mailto:ko...@tresata.com] *Sent:* Thursday, April 16, 2015 10:31 PM *To:* Evo Eftimov *Cc:* user@spark.apache.org *Subject:* Re: AMP Lab Indexed RDD - question for Data Bricks AMP Labs i believe it is a generalization of some classes inside graphx, where there was/is a need to keep stuff indexed for random access within each rdd partition On Thu, Apr 16, 2015 at 5:00 PM, Evo Eftimov evo.efti...@isecc.com wrote: Can somebody from Data Briks sched more light on this Indexed RDD library https://github.com/amplab/spark-indexedrdd It seems to come from AMP Labs and most of the Data Bricks guys are from there What is especially interesting is whether the Point Lookup (and the other primitives) can work from within a function (e.g. map) running on executors on worker nodes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/AMP-Lab-Indexed-RDD-question-for-Data-Bricks-AMP-Labs-tp22532.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: ClassCastException processing date fields using spark SQL since 1.3.0
Can you tell us how did you create the dataframe? From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Friday, April 17, 2015 2:52 AM To: rkrist Cc: user Subject: Re: ClassCastException processing date fields using spark SQL since 1.3.0 Filed: https://issues.apache.org/jira/browse/SPARK-6967 Shouldn't they be null? Statistics are only used to eliminate partitions that can't possibly hold matching values. So while you are right this might result in a false positive, that will not result in a wrong answer.
Base metrics for Spark Benchmarking.
Hello, We wanted to tune the Spark running on YARN cluster.The Spark History Server UI shows lots of parameters like: - GC time - Task Duration - Shuffle R/W - Shuffle Spill (Memory/Disk) - Serialization Time (Task/Result) - Scheduler Delay Among the above metrics, which are the most important that should be taken as reference for benchmarking the cluster performance? Thanks, Bijay
Re: Spark on Windows
The hadoop support from HortonWorks only *actually *works with Windows Server - well at least as of Spark Summit last year : and AFAIK that has not changed since 2015-04-16 15:18 GMT-07:00 Dean Wampler deanwamp...@gmail.com: If you're running Hadoop, too, now that Hortonworks supports Spark, you might be able to use their distribution. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Thu, Apr 16, 2015 at 2:19 PM, Arun Lists lists.a...@gmail.com wrote: Thanks, Matei! We'll try that and let you know if it works. You are correct in inferring that some of the problems we had were with dependencies. We also had problems with the spark-submit scripts. I will get the details from the engineer who worked on the Windows builds and provide them to you. arun On Thu, Apr 16, 2015 at 10:44 AM, Matei Zaharia matei.zaha...@gmail.com wrote: You could build Spark with Scala 2.11 on Mac / Linux and transfer it over to Windows. AFAIK it should build on Windows too, the only problem is that Maven might take a long time to download dependencies. What errors are you seeing? Matei On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote: We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun