Re: SparkSQL LEFT JOIN problem
Hi Can you try select birthday from customer left join profile on customer.account_id = profile.account_id to see if the problems remains on your entire data? Thanks, Liquan On Fri, Oct 10, 2014 at 8:20 AM, invkrh inv...@gmail.com wrote: Hi, I am exploring SparkSQL 1.1.0, I have a problem on LEFT JOIN. Here is the request: select * from customer left join profile on customer.account_id = profile.account_id The two tables' schema are shown as following: // Table: customer root |-- account_id: string (nullable = false) |-- birthday: string (nullable = true) |-- preferstore: string (nullable = true) |-- registstore: string (nullable = true) |-- gender: string (nullable = true) |-- city_name_en: string (nullable = true) |-- register_date: string (nullable = true) |-- zip: string (nullable = true) // Table: profile root |-- account_id: string (nullable = false) |-- card_type: string (nullable = true) |-- card_upgrade_time_black: string (nullable = true) |-- card_upgrade_time_gold: string (nullable = true) However, I have always an exception: Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Join LeftOuter, Some(('customer.account_id = 'profile.account_id)) Subquery customer SparkLogicalPlan (ExistingRdd [account_id#0,birthday#1,preferstore#2,registstore#3,gender#4,city_name_en#5,register_date#6,zip#7], MappedRDD[5] at map at SQLFetcher.scala:43) Subquery profile SparkLogicalPlan (ExistingRdd [account_id#8,card_type#9,card_upgrade_time_black#10,card_upgrade_time_gold#11], MappedRDD[12] at map at SQLFetcher.scala:43) I was not sure where the problem is. So I create two simple tables to isolate the problem. // table 1 a b c 4 8 9 1 3 4 3 4 5 // table 2 a b c 1 2 3 4 5 6 This time, it works. So the problem might be in data. I have just sampled some lines of input tables to create new ones. This also works. I am so confused. The problem is in the data, but the error messages are not enough to find it (if I am not missing anything.) Some lines of the sampled tables. // Table: customer [50660,1975-06-05 00:00:00.000,13,12,male,ningboshi,2006-12-14 00:00:00.000,] [50666,1984-02-23 00:00:00.000,72,5,Female,beijingshi,2006-12-14 00:00:00.000,100086] [50680,1976-11-25 00:00:00.000,59,5,Female,beijingshi,2006-12-14 00:00:00.000,100022] [85,1971-03-27 00:00:00.000,2,2,Female,shanghaishi,2005-09-20 00:00:00.000,200336] // Table: profile [1144681,3,2010-02-18 00:00:00.000,2013-02-28 00:00:00.000] [50666,2,2010-10-31 00:00:00.000,] [3930657,1,,] [1056365,2,2009-12-29 00:00:00.000,] Any help ? =) Hao -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-LEFT-JOIN-problem-tp16152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
I am working on a PR to leverage the HashJoin trait code to optimize the Left/Right outer join. It's already been tested locally and will send out the PR soon after some clean up. Thanks, Liquan On Wed, Oct 8, 2014 at 12:09 AM, Matei Zaharia matei.zaha...@gmail.com wrote: I'm pretty sure inner joins on Spark SQL already build only one of the sides. Take a look at ShuffledHashJoin, which calls HashJoin.joinIterators. Only outer joins do both, and it seems like we could optimize it for those that are not full. Matei On Oct 7, 2014, at 11:04 PM, Haopu Wang hw...@qilinsoft.com wrote: Liquan, yes, for full outer join, one hash table on both sides is more efficient. For the left/right outer join, it looks like one hash table should be enought. -- *From:* Liquan Pei [mailto:liquan...@gmail.com liquan...@gmail.com] *Sent:* 2014年9月30日 18:34 *To:* Haopu Wang *Cc:* d...@spark.apache.org; user *Subject:* Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, How about full outer join? One hash table may not be efficient for this case. Liquan On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the right side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! -- *From:* Liquan Pei [mailto:liquan...@gmail.com] *Sent:* 2014年9月30日 12:31 *To:* Haopu Wang *Cc:* d...@spark.apache.org; user *Subject:* Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Broadcast Torrent fail - then the job dies
Hi Lewis, For debugging purpose, can you try using HttpBroadCast to see if the error remains? You can enable HttpBroadCast by setting spark.broadcast.factory to org.apache.spark.broadcast.HttpBroadcastFactory in spark conf. Thanks, Liquan On Wed, Oct 8, 2014 at 11:21 AM, Steve Lewis lordjoe2...@gmail.com wrote: I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2 - I repeatedly see the following in my logs. I believe this happens in combineByKey 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast variable 0 failed 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 5.006378813 s 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) - -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?
There is a toDebugString method in rdd that will print a description of this RDD and its recursive dependencies for debugging. Thanks, Liquan On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: My job is not being fault-tolerant (e.g., when there's a fetch failure or something). The lineage of RDDs are constantly updated every iteration. However, I think that when there's a failure, the lineage information is not being correctly reapplied. It goes something like this: val rawRDD = read(...) val repartRDD = rawRDD.repartition(X) val tx1 = repartRDD.map(...) var tx2 = tx1.map(...) while (...) { tx2 = tx1.zip(tx2).map(...) } Is there any way to monitor RDD's lineage, maybe even including? I want to make sure that there's no unexpected things happening. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Is RDD partition index consistent?
Hi, The partition information for Spark master is not updated after a stage failure. In case of HDFS, Spark gets partition information from InputFormat and if a data node in HDFS is down when spark is performing computation for a certain stage, this stage will fail and be resubmitted using the same partition information. With failures like a data node loss, I think the partition index you get when you call mapPartitionWithIndex is consistent. Hope this helps! Liquan On Mon, Oct 6, 2014 at 12:33 PM, Sung Hwan Chung coded...@cs.stanford.edu wrote: Is the RDD partition index you get when you call mapPartitionWithIndex consistent under fault-tolerance condition? I.e. 1. Say index is 1 for one of the partitions when you call data.mapPartitionWithIndex((index, rows) = ) // Say index is 1 2. The partition fails (maybe a long with a bunch of other partitions). 3. When the partitions get restarted somewhere else, will they retain the same index value, as well as all the lineage arguments? -- Liquan Pei Department of Physics University of Massachusetts Amherst
Fwd: Spark SQL: ArrayIndexOutofBoundsException
-- Forwarded message -- From: Liquan Pei liquan...@gmail.com Date: Thu, Oct 2, 2014 at 3:42 PM Subject: Re: Spark SQL: ArrayIndexOutofBoundsException To: SK skrishna...@gmail.com There is only one place you use index 1. One possible issue is that your may have only one element after your split by \t. Can you try to run the following code to make sure every line has at least two elements? val tusers = sc.textFile(inp_file) .map(_.split(\t)) .filter( x = x.length 2) .count() It should return non zero values if your data contains a line with less than two values Liquan On Thu, Oct 2, 2014 at 3:35 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to extract the number of distinct users from a file using Spark SQL, but I am getting the following error: ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15) java.lang.ArrayIndexOutOfBoundsException: 1 I am following the code in examples/sql/RDDRelation.scala. My code is as follows. The error is appearing when it executes the SQL statement. I am new to Spark SQL. I would like to know how I can fix this issue. thanks for your help. val sql_cxt = new SQLContext(sc) import sql_cxt._ // read the data using th e schema and create a schema RDD val tusers = sc.textFile(inp_file) .map(_.split(\t)) .map(p = TUser(p(0), p(1).trim.toInt)) // register the RDD as a table tusers.registerTempTable(tusers) // get the number of unique users val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM tusers).collect().head.getLong(0) println(unique_count) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Relation between worker memory and executor memory in standalone mode
One indirect way to control the number of cores used in an executor is to set spark.cores.max and set spark.deploy.spreadOut to be true. The scheduler in the standalone cluster then assigns roughly the same number of cores (spark.cores.max/number of worker nodes) to each executor for an application. Liquan On Wed, Oct 1, 2014 at 1:06 PM, Boromir Widas vcsub...@gmail.com wrote: I have not found a way to control the cores yet. This effectively limits the cluster to a single application at a time. A subsequent application shows in the 'WAITING' State on the dashboard. On Wed, Oct 1, 2014 at 2:49 PM, Akshat Aranya aara...@gmail.com wrote: On Wed, Oct 1, 2014 at 11:33 AM, Akshat Aranya aara...@gmail.com wrote: On Wed, Oct 1, 2014 at 11:00 AM, Boromir Widas vcsub...@gmail.com wrote: 1. worker memory caps executor. 2. With default config, every job gets one executor per worker. This executor runs with all cores available to the worker. By the job do you mean one SparkContext or one stage execution within a program? Does that also mean that two concurrent jobs will get one executor each at the same time? Experimenting with this some more, I figured out that an executor takes away spark.executor.memory amount of memory from the configured worker memory. It also takes up all the cores, so even if there is still some memory left, there are no cores left for starting another executor. Is my assessment correct? Is there no way to configure the number of cores that an executor can use? On Wed, Oct 1, 2014 at 11:04 AM, Akshat Aranya aara...@gmail.com wrote: Hi, What's the relationship between Spark worker and executor memory settings in standalone mode? Do they work independently or does the worker cap executor memory? Also, is the number of concurrent executors per worker capped by the number of CPU cores configured for the worker? -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: still GC overhead limit exceeded after increasing heap space
Hi How many nodes in your cluster? It seems to me 64g does not help if each of your node doesn't have that many memory. Liquan On Wed, Oct 1, 2014 at 1:37 PM, anny9699 anny9...@gmail.com wrote: Hi, After reading some previous posts about this issue, I have increased the java heap space to -Xms64g -Xmx64g, but still met the java.lang.OutOfMemoryError: GC overhead limit exceeded error. Does anyone have other suggestions? I am reading a data of 200 GB and my total memory is 120 GB, so I use MEMORY_AND_DISK_SER and kryo serialization. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/still-GC-overhead-limit-exceeded-after-increasing-heap-space-tp15540.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: still GC overhead limit exceeded after increasing heap space
Can you use spark submit to set the the executor memory? Take a look at https://spark.apache.org/docs/latest/submitting-applications.html. Liquan On Wed, Oct 1, 2014 at 2:21 PM, 陈韵竹 anny9...@gmail.com wrote: Thanks Sean. This is how I set this memory. I set it when I start to run the job java -Xms64g -Xmx64g -cp /root/spark/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/root/scala/lib/scala-library.jar:./target/MyProject.jar MyClass Is there some problem with it? On Wed, Oct 1, 2014 at 2:03 PM, Sean Owen so...@cloudera.com wrote: How are you setting this memory? You may be configuring the wrong process's memory, like the driver and not the executors. On Oct 1, 2014 9:37 PM, anny9699 anny9...@gmail.com wrote: Hi, After reading some previous posts about this issue, I have increased the java heap space to -Xms64g -Xmx64g, but still met the java.lang.OutOfMemoryError: GC overhead limit exceeded error. Does anyone have other suggestions? I am reading a data of 200 GB and my total memory is 120 GB, so I use MEMORY_AND_DISK_SER and kryo serialization. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/still-GC-overhead-limit-exceeded-after-increasing-heap-space-tp15540.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Creating a feature vector from text before using with MLLib
The program computes hashing bi-gram frequency normalized by total number of bigrams then filter out zero values. hashing is a effective trick of vectorizing features. Take a look at http://en.wikipedia.org/wiki/Feature_hashing Liquan On Wed, Oct 1, 2014 at 2:18 PM, Soumya Simanta soumya.sima...@gmail.com wrote: I'm trying to understand the intuition behind the features method that Aaron used in one of his demos. I believe this feature will just work for detecting the character set (i.e., language used). Can someone help ? def featurize(s: String): Vector = { val n = 1000 val result = new Array[Double](n) val bigrams = s.sliding(2).toArray for (h - bigrams.map(_.hashCode % n)) { result(h) += 1.0 / bigrams.length } Vectors.sparse(n, result.zipWithIndex.filter(_._1 != 0).map(_.swap)) } -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
Hi Haopu, How about full outer join? One hash table may not be efficient for this case. Liquan On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote: Hi, Liquan, thanks for the response. In your example, I think the hash table should be built on the right side, so Spark can iterate through the left side and find matches in the right side from the hash table efficiently. Please comment and suggest, thanks again! -- *From:* Liquan Pei [mailto:liquan...@gmail.com] *Sent:* 2014年9月30日 12:31 *To:* Haopu Wang *Cc:* d...@spark.apache.org; user *Subject:* Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin? Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: processing large number of files
You can use sc.wholeTextFiles to read a directory of text file. Also, it seems from your code that you are only interested in the current year's count, you can perform a filter before distinct() and perform a reduce to sum up counts. Hope this helps! Liquan On Tue, Sep 30, 2014 at 1:59 PM, SK skrishna...@gmail.com wrote: Hi, I am trying to compute the number of unique users from a year's worth of data. So there are about 300 files and each file is quite large (~GB). I first tried this without a loop by reading all the files in the directory using the glob pattern: sc.textFile(dir/*). But the tasks were stalling and I was getting a Too many open files warning, even though I increased the nofile limit to 500K. The number of shuffle tasks that were being created was more than 200K and they were all generating shuffle files. Setting consolidateFiles to true did not help. So now I am reading the files in a loop as shown in the code below. Now I dont run in to the Too many open files issue. But the countByKey is taking a really long time (more then 15 hours and still ongoing). It appears from the logs that this operation is happening on a single node. From the logs, I am not able to figure out why it is taking so long. Each node has 16 GB memory and the mesos cluster has 16 nodes. I have set spark.serializer to KryoSerializer. I am not running into any out of memory errors so far. Is there some way to improve the performance? Thanks. for (i - 1 to 300) { var f = file + i//name of the file val user_time = sc.textFile(f) .map(line = { val fields = line.split(\t) (fields(11), fields(6)) }) // extract (year-month, user_id) .distinct() .countByKey // group by with year as the key // now convert Map object to RDD in order to output results val ut_rdd = sc.parallelize(user_time.toSeq) // convert to array to extract the count. Need to find if there is an easier way to do this. var ar = ut_rdd.toArray() // aggregate the count for the year ucount = ucount + ar(0)._2 } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/processing-large-number-of-files-tp15429.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: memory vs data_size
Hi, By default, 60% of JVM memory is reserved for RDD caching, so in your case, 72GB memory is available for RDDs which means that your total data may fit in memory. You can check the RDD memory statistics via the storage tab in web ui. Hope this helps! Liquan On Tue, Sep 30, 2014 at 4:11 PM, anny9699 anny9...@gmail.com wrote: Hi, Is there a guidance about for a data of certain data size, how much total memory should be needed to achieve a relatively good speed? I have a data of around 200 GB and the current total memory for my 8 machines are around 120 GB. Is that too small to run the data of this big? Even the read in and simple initial processing seems to last forever. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/memory-vs-data-size-tp15443.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: aggregateByKey vs combineByKey
Hi Dave, You can replace groupByKey with reduceByKey to improve performance in some cases. reduceByKey performs map side combine which can reduce Network IO and shuffle size where as groupByKey will not perform map side combine. combineByKey is more general then aggregateByKey. Actually, the implementation of aggregateByKey, reduceByKey and groupByKey is achieved by combineByKey. aggregateByKey is similar to reduceByKey but you can provide initial values when performing aggregation. As the name suggests, aggregateByKey is suitable for compute aggregations for keys, example aggregations such as sum, avg, etc. The rule here is that the extra computation spent for map side combine can reduce the size sent out to other nodes and driver. If your func has satisfies this rule, you probably should use aggregateByKey. combineByKey is more general and you have the flexibility to specify whether you'd like to perform map side combine. However, it is more complex to use. At minimum, you need to implement three functions: createCombiner, mergeValue, mergeCombiners. Hope this helps! Liquan On Sun, Sep 28, 2014 at 11:59 PM, David Rowe davidr...@gmail.com wrote: Hi All, After some hair pulling, I've reached the realisation that an operation I am currently doing via: myRDD.groupByKey.mapValues(func) should be done more efficiently using aggregateByKey or combineByKey. Both of these methods would do, and they seem very similar to me in terms of their function. My question is, what are the differences between these two methods (other than the slight differences in their type signatures)? Under what circumstances should I use one or the other? Thanks Dave -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Simple Question: Spark Streaming Applications
Hi Saiph, Map is used for transformation on your input RDD. If you don't need transformation of your input, you don't need to use map. Thanks, Liquan On Mon, Sep 29, 2014 at 10:15 AM, Saiph Kappa saiph.ka...@gmail.com wrote: Hi, Do all spark streaming applications use the map operation? or the majority of them? Thanks. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Fwd: about partition number
-- Forwarded message -- From: Liquan Pei liquan...@gmail.com Date: Mon, Sep 29, 2014 at 2:12 PM Subject: Re: about partition number To: anny9699 anny9...@gmail.com The number of cores available in your cluster determines the number of tasks that can be run concurrently. If your data is evenly partitioned, the number of partitions should approximately equal to total_coreNumber. Liquan On Mon, Sep 29, 2014 at 2:01 PM, anny9699 anny9...@gmail.com wrote: Hi, I read the past posts about partition number, but am still a little confused about partitioning strategy. I have a cluster with 8 works and 2 cores for each work. Is it true that the optimal partition number should be 2-4 * total_coreNumber or should approximately equal to total_coreNumber? Or it's the task number that really determines the speed rather then partition number? Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-partition-number-tp15362.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: about partition number
Hi Anny, Much more partitions is not recommended in general as that creates a lot of small tasks. All the tasks needs to send to worker nodes for execution. Too many partitions increases task scheduling overhead. Spark uses synchronous execution model which means that all tasks in a stage need to finish before executing the next stage. 2-4 tasks per core keep CPUs busy in cases that some tasks are small and finishes early. Hope this helps! Liquan On Mon, Sep 29, 2014 at 2:17 PM, 陈韵竹 anny9...@gmail.com wrote: Thanks Liquan! I thought about the same thing, but then why people are still using much more partitions than core number? Anny On Mon, Sep 29, 2014 at 2:12 PM, Liquan Pei liquan...@gmail.com wrote: The number of cores available in your cluster determines the number of tasks that can be run concurrently. If your data is evenly partitioned, the number of partitions should approximately equal to total_coreNumber. Liquan On Mon, Sep 29, 2014 at 2:01 PM, anny9699 anny9...@gmail.com wrote: Hi, I read the past posts about partition number, but am still a little confused about partitioning strategy. I have a cluster with 8 works and 2 cores for each work. Is it true that the optimal partition number should be 2-4 * total_coreNumber or should approximately equal to total_coreNumber? Or it's the task number that really determines the speed rather then partition number? Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/about-partition-number-tp15362.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: in memory assumption in cogroup?
Hi Koert, cogroup is a transformation on RDD and it creates a cogroupRDD and then perform some transformations on it. When later an action is called, the compute() method of the cogroupRDD will be called. Roughly speaking, each element in cogroupRDD is fetched one at a time. Thus the contents of the two iterables do not need to fit in memory. Hope this helps! Liq On Mon, Sep 29, 2014 at 4:02 PM, Koert Kuipers ko...@tresata.com wrote: apologies for asking yet again about spark memory assumptions, but i cant seem to keep it in my head. if i use PairRDDFunctions.cogroup, it returns for every key 2 iterables. do the contents of these iterables have to fit in memory? or is the data streamed? -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?
Hi Haopu, My understanding is that the hashtable on both left and right side is used for including null values in result in an efficient manner. If hash table is only built on one side, let's say left side and we perform a left outer join, for each row in left side, a scan over the right side is needed to make sure that no matching tuples for that row on left side. Hope this helps! Liquan On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote: I take a look at HashOuterJoin and it's building a Hashtable for both sides. This consumes quite a lot of memory when the partition is big. And it doesn't reduce the iteration on streamed relation, right? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Fwd: Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction?
-- Forwarded message -- From: Liquan Pei liquan...@gmail.com Date: Fri, Sep 26, 2014 at 1:33 AM Subject: Re: Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction? To: Haopu Wang hw...@qilinsoft.com Hi Haopu, Internally, cactheTable on a schemaRDD is implemented as a cache() on a MapPartitionsRDD. As memory reserved for caching RDDs is controlled by spark.storage.memoryFraction, memory storage of cached schemaRDD is controlled by spark.storage.memoryFraction. Hope this helps! Liquan On Fri, Sep 26, 2014 at 1:04 AM, Haopu Wang hw...@qilinsoft.com wrote: Hi, I'm querying a big table using Spark SQL. I see very long GC time in some stages. I wonder if I can improve it by tuning the storage parameter. The question is: the schemaRDD has been cached with cacheTable() function. So is the cached schemaRDD part of memory storage controlled by the spark.storage.memoryFraction parameter? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: MLUtils.loadLibSVMFile error
) scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) scala.collection.TraversableLike$class.map(TraversableLike.scala:244) scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79) org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: RDD of Iterable[String]
Hi Deep, I believe that you are referring to the map for Iterable[String] suppose you have iter:Iterable[String] you can do newIter = iter.map(item = Item + a ) which will create an new Iterable[String] with each element appending an a to all string in iter. Does this answer your question? Liquan On Thu, Sep 25, 2014 at 12:43 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: what should come in the map?? Thanks Liquan for answering me... I really need some help..I am stuck in some thing. On Thu, Sep 25, 2014 at 12:57 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: what should come in the map?? On Wed, Sep 24, 2014 at 10:52 PM, Liquan Pei liquan...@gmail.com wrote: Hi Deep, The Iterable trait in scala has methods like map and reduce that you can use to iterate elements of Iterable[String]. You can also create an Iterator from the Iterable. For example, suppose you have val rdd: RDD[Iterable[String]] you can do rdd.map { x = //x has type Iterable[String] x.map(...) // Process elements in iterable[String] val iter:Iterator[String] = x.iterator while(iter.hasNext) { iter.next() } } Hope this helps! Liquan On Wed, Sep 24, 2014 at 8:21 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Can we iterate over RDD of Iterable[String]? How do we do that? Because the entire Iterable[String] seems to be a single element in the RDD. Thank You -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: sortByKey trouble
Hi David, Can you try val rddToSave = file.map(l = l.split(\\|)).map(r = (r(34)+-+r(3), (r(4), r(10), r(12 ? That should work. Liquan On Wed, Sep 24, 2014 at 1:29 AM, david david...@free.fr wrote: Hi, Does anybody know how to use sortbykey in scala on a RDD like : val rddToSave = file.map(l = l.split(\\|)).map(r = (r(34)+-+r(3), r(4), r(10), r(12))) besauce, i received ann error sortByKey is not a member of ord.apache.spark.rdd.RDD[(String,String,String,String)]. What i try do do is sort on the first element. Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: RDD of Iterable[String]
Hi Deep, The Iterable trait in scala has methods like map and reduce that you can use to iterate elements of Iterable[String]. You can also create an Iterator from the Iterable. For example, suppose you have val rdd: RDD[Iterable[String]] you can do rdd.map { x = //x has type Iterable[String] x.map(...) // Process elements in iterable[String] val iter:Iterator[String] = x.iterator while(iter.hasNext) { iter.next() } } Hope this helps! Liquan On Wed, Sep 24, 2014 at 8:21 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Can we iterate over RDD of Iterable[String]? How do we do that? Because the entire Iterable[String] seems to be a single element in the RDD. Thank You -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: MLUtils.loadLibSVMFile error
(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: How to sort rdd filled with existing data structures?
You only need to define an ordering of student, no need to modify the class definition of student. It's like a Comparator class in java. Currently, you have to map the rdd to sort by value. Liquan On Wed, Sep 24, 2014 at 9:52 AM, Sean Owen so...@cloudera.com wrote: See the scaladoc for how to define an implicit ordering to use with sortByKey: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions Off the top of my head, I think this is 90% correct to order by age for example: implicit val studentOrdering: Ordering[Student] = Ordering.by(_.age) On Wed, Sep 24, 2014 at 3:07 PM, Tao Xiao xiaotao.cs@gmail.com wrote: Hi , I have the following rdd : val conf = new SparkConf() .setAppName( Testing Sorting ) val sc = new SparkContext(conf) val L = List( (new Student(XiaoTao, 80, 29), I'm Xiaotao), (new Student(CCC, 100, 24), I'm CCC), (new Student(Jack, 90, 25), I'm Jack), (new Student(Tom, 60, 35), I'm Tom), (new Student(Lucy, 78, 22), I'm Lucy)) val rdd = sc.parallelize(L, 3) where Student is a class defined as follows: class Student(val name:String, val score:Int, val age:Int) { override def toString = name: + name + , score: + score + , age: + age } I want to sort the rdd by key, but when I wrote rdd.sortByKey it complained that No implicit Ordering defined, which means I must extend the class with Ordered and provide a method named compare. The problem is that the class Student is from a third-party library so I cannot change its definition. I'd like to know if there is a sorting method that I can provide it a customized compare function so that it can sort the rdd according to the sorting function I provide. One more question, if I want to sort RDD[(k, v)] by value , do I have to map that rdd so that its key and value exchange their positions in the tuple? Are there any functions that allow us to sort rdd by things other than key ? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: MLUtils.loadLibSVMFile error
) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: MLlib, what online(streaming) algorithms are available?
Hi Oleksiy, Right now, only streaming linear regression is available in MLlib. There are working in progress on Streaming K-means and Streaming SVM. Please take a look at the following jiras for more information. Streaming K-means https://issues.apache.org/jira/browse/SPARK-3254 Streaming SVM https://issues.apache.org/jira/browse/SPARK-3436 Thanks, Liquan On Tue, Sep 23, 2014 at 8:21 AM, aka.fe2s aka.f...@gmail.com wrote: Hi, I'm looking for available online ML algorithms (that improve model with new streaming data). The only one I found is linear regression. Is there anything else implemented as part of MLlib? Thanks, Oleksiy. -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: General question on persist
Hi Arun, The intermediate results like keyedRecordPieces will not be materialized. This indicates that if you run partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() again, the keyedRecordPieces will be re-computed . In this case, cache or persist keyedRecordPieces is a good idea to eliminate unnecessary expensive computation. What you can probably do is keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)).cache() Which will cache the RDD referenced by keyedRecordPieces in memory. For more options on cache and persist, take a look at http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD. There are two APIs you can use to persist RDDs and one allows you to specify storage level. Thanks, Liquan On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja aahuj...@gmail.com wrote: I have a general question on when persisting will be beneficial and when it won't: I have a task that runs as follow keyedRecordPieces = records.flatMap( record = Seq(key, recordPieces)) partitoned = keyedRecordPieces.partitionBy(KeyPartitioner) partitoned.mapPartitions(doComputation).save() Is there value in having a persist somewhere here? For example if the flatMap step is particularly expensive, will it ever be computed twice when there are no failures? Thanks Arun -- Liquan Pei Department of Physics University of Massachusetts Amherst
Re: Memory compute-intensive tasks
Hi Ravi, I have seen a similar issue before. You can try to set fs.hdfs.impl.disable.cache to true in your hadoop configuration. For example, suppose your hadoop configuration file is hadoopConf, you can use hadoopConf.setBoolean(fs.hdfs.impl.disable.cache, true) Let me know if that helps. Best, Liquan On Wed, Jul 16, 2014 at 4:56 PM, rpandya r...@iecommerce.com wrote: Matei - I tried using coalesce(numNodes, true), but it then seemed to run too few SNAP tasks - only 2 or 3 when I had specified 46. The job failed, perhaps for unrelated reasons, with some odd exceptions in the log (at the end of this message). But I really don't want to force data movement between nodes. The input data is in HDFS and should already be somewhat balanced among the nodes. We've run this scenario using the simple hadoop jar runner and a custom format jar to break the input into 8-line chunks (paired FASTQ). Ideally I'd like Spark to do the minimum data movement to balance the work, feeding each task mostly from data local to that node. Daniel - that's a good thought, I could invoke a small stub for each task that talks to a single local demon process over a socket, and serializes all the tasks on a given machine. Thanks, Ravi P.S. Log exceptions: 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve SparkContext in spite of waiting for 10, maxNumTries = 10 Exception in thread main java.lang.NullPointerException at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110) ...and later... 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal. 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close() java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707) at org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Liquan Pei Department of Physics University of Massachusetts Amherst