Re: SparkSQL LEFT JOIN problem

2014-10-10 Thread Liquan Pei
Hi

Can you try
select birthday from customer left join profile on customer.account_id =
profile.account_id
to see if the problems remains on your entire data?

Thanks,
Liquan

On Fri, Oct 10, 2014 at 8:20 AM, invkrh inv...@gmail.com wrote:

 Hi,

 I am exploring SparkSQL 1.1.0, I have a problem on LEFT JOIN.

 Here is the request:

 select * from customer left join profile on customer.account_id =
 profile.account_id

 The two tables' schema are shown as following:

 // Table: customer
 root
  |-- account_id: string (nullable = false)
  |-- birthday: string (nullable = true)
  |-- preferstore: string (nullable = true)
  |-- registstore: string (nullable = true)
  |-- gender: string (nullable = true)
  |-- city_name_en: string (nullable = true)
  |-- register_date: string (nullable = true)
  |-- zip: string (nullable = true)

 // Table: profile
 root
  |-- account_id: string (nullable = false)
  |-- card_type: string (nullable = true)
  |-- card_upgrade_time_black: string (nullable = true)
  |-- card_upgrade_time_gold: string (nullable = true)

 However, I have always an exception:

 Exception in thread main
 org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved
 attributes: *, tree:
 Project [*]
  Join LeftOuter, Some(('customer.account_id = 'profile.account_id))
   Subquery customer
SparkLogicalPlan (ExistingRdd

 [account_id#0,birthday#1,preferstore#2,registstore#3,gender#4,city_name_en#5,register_date#6,zip#7],
 MappedRDD[5] at map at SQLFetcher.scala:43)
   Subquery profile
SparkLogicalPlan (ExistingRdd

 [account_id#8,card_type#9,card_upgrade_time_black#10,card_upgrade_time_gold#11],
 MappedRDD[12] at map at SQLFetcher.scala:43)

 I was not sure where the problem is. So I create two simple tables to
 isolate the problem.

 // table 1
 a   b   c
 4   8   9
 1   3   4
 3   4   5

 // table 2
 a   b   c
 1   2   3
 4   5   6

 This time, it works.

 So the problem might be in data. I have just sampled some lines of input
 tables to create new ones.
 This also works.

 I am so confused. The problem is in the data, but the error messages are
 not
 enough to find it (if I am not missing anything.)

 Some lines of the sampled tables.

 // Table: customer

 [50660,1975-06-05 00:00:00.000,13,12,male,ningboshi,2006-12-14
 00:00:00.000,]
 [50666,1984-02-23 00:00:00.000,72,5,Female,beijingshi,2006-12-14
 00:00:00.000,100086]
 [50680,1976-11-25 00:00:00.000,59,5,Female,beijingshi,2006-12-14
 00:00:00.000,100022]
 [85,1971-03-27 00:00:00.000,2,2,Female,shanghaishi,2005-09-20
 00:00:00.000,200336]


 // Table: profile

 [1144681,3,2010-02-18 00:00:00.000,2013-02-28 00:00:00.000]
 [50666,2,2010-10-31 00:00:00.000,]
 [3930657,1,,]
 [1056365,2,2009-12-29 00:00:00.000,]

 Any help ? =)

 Hao




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-LEFT-JOIN-problem-tp16152.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-10-08 Thread Liquan Pei
I am working on a PR to leverage the HashJoin trait code to optimize the
Left/Right outer join. It's already been tested locally and will send out
the PR soon after some clean up.

Thanks,
Liquan

On Wed, Oct 8, 2014 at 12:09 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 I'm pretty sure inner joins on Spark SQL already build only one of the
 sides. Take a look at ShuffledHashJoin, which calls HashJoin.joinIterators.
 Only outer joins do both, and it seems like we could optimize it for those
 that are not full.

 Matei



 On Oct 7, 2014, at 11:04 PM, Haopu Wang hw...@qilinsoft.com wrote:

 Liquan, yes, for full outer join, one hash table on both sides is more
 efficient.

 For the left/right outer join, it looks like one hash table should be
 enought.

 --
 *From:* Liquan Pei [mailto:liquan...@gmail.com liquan...@gmail.com]
 *Sent:* 2014年9月30日 18:34
 *To:* Haopu Wang
 *Cc:* d...@spark.apache.org; user
 *Subject:* Re: Spark SQL question: why build hashtable for both sides in
 HashOuterJoin?

 Hi Haopu,

 How about full outer join? One hash table may not be efficient for this
 case.

 Liquan

 On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote:
 Hi, Liquan, thanks for the response.

 In your example, I think the hash table should be built on the right
 side, so Spark can iterate through the left side and find matches in the
 right side from the hash table efficiently. Please comment and suggest,
 thanks again!

 --
 *From:* Liquan Pei [mailto:liquan...@gmail.com]
 *Sent:* 2014年9月30日 12:31
 *To:* Haopu Wang
 *Cc:* d...@spark.apache.org; user
 *Subject:* Re: Spark SQL question: why build hashtable for both sides in
 HashOuterJoin?

 Hi Haopu,

 My understanding is that the hashtable on both left and right side is used
 for including null values in result in an efficient manner. If hash table
 is only built on one side, let's say left side and we perform a left outer
 join, for each row in left side, a scan over the right side is needed to
 make sure that no matching tuples for that row on left side.

 Hope this helps!
 Liquan

 On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote:

 I take a look at HashOuterJoin and it's building a Hashtable for both
 sides.

 This consumes quite a lot of memory when the partition is big. And it
 doesn't reduce the iteration on streamed relation, right?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst



 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst





-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Broadcast Torrent fail - then the job dies

2014-10-08 Thread Liquan Pei
Hi Lewis,

For debugging purpose, can you try using HttpBroadCast to see if the error
remains? You can enable HttpBroadCast by setting spark.broadcast.factory
to org.apache.spark.broadcast.HttpBroadcastFactory in spark conf.

Thanks,
Liquan

On Wed, Oct 8, 2014 at 11:21 AM, Steve Lewis lordjoe2...@gmail.com wrote:

 I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2
 - I repeatedly see
 the following in my logs.

 I believe this happens in combineByKey


 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0
 (TID 3)
 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading
 broadcast variable 0
 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast
 variable 0 failed
 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast
 variable 0 took 5.006378813 s
 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading
 broadcast variable 0
 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage
 0.0 (TID 0)
 java.lang.NullPointerException
 at java.nio.ByteBuffer.wrap(ByteBuffer.java:392)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
 at org.apache.spark.scheduler.Task.run(Task.scala:54)

 -




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Is there a way to look at RDD's lineage? Or debug a fault-tolerance error?

2014-10-08 Thread Liquan Pei
There is a toDebugString method in rdd that will print a description of
this RDD and its recursive dependencies for debugging.

Thanks,
Liquan

On Wed, Oct 8, 2014 at 12:01 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 My job is not being fault-tolerant (e.g., when there's a fetch failure or
 something).

 The lineage of RDDs are constantly updated every iteration. However, I
 think that when there's a failure, the lineage information is not being
 correctly reapplied.

 It goes something like this:

 val rawRDD = read(...)
 val repartRDD = rawRDD.repartition(X)

 val tx1 = repartRDD.map(...)
 var tx2 = tx1.map(...)

 while (...) {
   tx2 = tx1.zip(tx2).map(...)
 }


 Is there any way to monitor RDD's lineage, maybe even including? I want to
 make sure that there's no unexpected things happening.




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Is RDD partition index consistent?

2014-10-06 Thread Liquan Pei
Hi,

The partition information for Spark master is not updated after a stage
failure. In case of HDFS, Spark gets partition information from InputFormat
and if a data node in HDFS is down when spark is performing computation for
a certain stage, this stage will fail and be resubmitted using the same
partition information.  With failures like a data node loss, I think the
partition index you get when you call mapPartitionWithIndex is consistent.

Hope this helps!
Liquan

On Mon, Oct 6, 2014 at 12:33 PM, Sung Hwan Chung coded...@cs.stanford.edu
wrote:

 Is the RDD partition index you get when you call mapPartitionWithIndex
 consistent under fault-tolerance condition?

 I.e.

 1. Say index is 1 for one of the partitions when you call
 data.mapPartitionWithIndex((index, rows) = ) // Say index is 1
 2. The partition fails (maybe a long with a bunch of other partitions).
 3. When the partitions get restarted somewhere else, will they retain the
 same index value, as well as all the lineage arguments?





-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Fwd: Spark SQL: ArrayIndexOutofBoundsException

2014-10-02 Thread Liquan Pei
-- Forwarded message --
From: Liquan Pei liquan...@gmail.com
Date: Thu, Oct 2, 2014 at 3:42 PM
Subject: Re: Spark SQL: ArrayIndexOutofBoundsException
To: SK skrishna...@gmail.com


There is only one place you use index 1. One possible issue is that your
may have only one element after your split by \t.

Can you try to run the following code to make sure every line has at least
two elements?

val tusers = sc.textFile(inp_file)
   .map(_.split(\t))
   .filter( x = x.length  2)
   .count()
It should return non zero values if your data contains a line with less
than two values

Liquan

On Thu, Oct 2, 2014 at 3:35 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I am trying to extract the number of distinct users from a file using Spark
 SQL, but  I am getting  the following error:


 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 15)
 java.lang.ArrayIndexOutOfBoundsException: 1


  I am following the code in examples/sql/RDDRelation.scala. My code is as
 follows. The error is appearing when it executes the SQL statement. I am
 new
 to  Spark SQL. I would like to know how I can fix this issue.

 thanks for your help.


  val sql_cxt = new SQLContext(sc)
  import sql_cxt._

  // read the data using th e schema and create a schema RDD
  val tusers = sc.textFile(inp_file)
.map(_.split(\t))
.map(p = TUser(p(0), p(1).trim.toInt))

  // register the RDD as a table
  tusers.registerTempTable(tusers)

  // get the number of unique users
  val unique_count = sql_cxt.sql(SELECT COUNT (DISTINCT userid) FROM
 tusers).collect().head.getLong(0)

  println(unique_count)






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-ArrayIndexOutofBoundsException-tp15639.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst



-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Relation between worker memory and executor memory in standalone mode

2014-10-01 Thread Liquan Pei
One indirect way to control the number of cores used in an executor is to
set spark.cores.max and set spark.deploy.spreadOut to be true. The
scheduler in the standalone cluster then assigns roughly the same number of
cores (spark.cores.max/number of worker nodes) to each executor for an
application.

Liquan

On Wed, Oct 1, 2014 at 1:06 PM, Boromir Widas vcsub...@gmail.com wrote:

 I have not found a way to control the cores yet. This effectively limits
 the cluster to a single application at a time. A subsequent application
 shows in the 'WAITING' State on the dashboard.

 On Wed, Oct 1, 2014 at 2:49 PM, Akshat Aranya aara...@gmail.com wrote:



 On Wed, Oct 1, 2014 at 11:33 AM, Akshat Aranya aara...@gmail.com wrote:



 On Wed, Oct 1, 2014 at 11:00 AM, Boromir Widas vcsub...@gmail.com
 wrote:

 1. worker memory caps executor.
 2. With default config, every job gets one executor per worker. This
 executor runs with all cores available to the worker.

 By the job do you mean one SparkContext or one stage execution within a
 program?  Does that also mean that two concurrent jobs will get one
 executor each at the same time?


 Experimenting with this some more, I figured out that an executor takes
 away spark.executor.memory amount of memory from the configured worker
 memory.  It also takes up all the cores, so even if there is still some
 memory left, there are no cores left for starting another executor.  Is my
 assessment correct? Is there no way to configure the number of cores that
 an executor can use?




 On Wed, Oct 1, 2014 at 11:04 AM, Akshat Aranya aara...@gmail.com
 wrote:

 Hi,

 What's the relationship between Spark worker and executor memory
 settings in standalone mode?  Do they work independently or does the 
 worker
 cap executor memory?

 Also, is the number of concurrent executors per worker capped by the
 number of CPU cores configured for the worker?








-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: still GC overhead limit exceeded after increasing heap space

2014-10-01 Thread Liquan Pei
Hi

How many nodes in your cluster? It seems to me 64g does not help if each of
your node doesn't have that many memory.

Liquan

On Wed, Oct 1, 2014 at 1:37 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 After reading some previous posts about this issue, I have increased the
 java heap space to -Xms64g -Xmx64g, but still met the
 java.lang.OutOfMemoryError: GC overhead limit exceeded error. Does anyone
 have other suggestions?

 I am reading a data of 200 GB and my total memory is 120 GB, so I use
 MEMORY_AND_DISK_SER and kryo serialization.

 Thanks a lot!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/still-GC-overhead-limit-exceeded-after-increasing-heap-space-tp15540.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: still GC overhead limit exceeded after increasing heap space

2014-10-01 Thread Liquan Pei
Can you use spark submit to set the the executor memory? Take a look at
https://spark.apache.org/docs/latest/submitting-applications.html.

Liquan

On Wed, Oct 1, 2014 at 2:21 PM, 陈韵竹 anny9...@gmail.com wrote:

 Thanks Sean. This is how I set this memory. I set it when I start to run
 the job

 java -Xms64g -Xmx64g -cp
 /root/spark/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/root/scala/lib/scala-library.jar:./target/MyProject.jar
 MyClass

 Is there some problem with it?



 On Wed, Oct 1, 2014 at 2:03 PM, Sean Owen so...@cloudera.com wrote:

 How are you setting this memory? You may be configuring the wrong
 process's memory, like the driver and not the executors.
 On Oct 1, 2014 9:37 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 After reading some previous posts about this issue, I have increased the
 java heap space to -Xms64g -Xmx64g, but still met the
 java.lang.OutOfMemoryError: GC overhead limit exceeded error. Does
 anyone
 have other suggestions?

 I am reading a data of 200 GB and my total memory is 120 GB, so I use
 MEMORY_AND_DISK_SER and kryo serialization.

 Thanks a lot!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/still-GC-overhead-limit-exceeded-after-increasing-heap-space-tp15540.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Creating a feature vector from text before using with MLLib

2014-10-01 Thread Liquan Pei
The program computes hashing bi-gram frequency normalized by total number
of bigrams then filter out zero values. hashing is a effective trick of
vectorizing features. Take a look at
http://en.wikipedia.org/wiki/Feature_hashing

Liquan

On Wed, Oct 1, 2014 at 2:18 PM, Soumya Simanta soumya.sima...@gmail.com
wrote:

 I'm trying to understand the intuition behind the features method that
 Aaron used in one of his demos. I believe this feature will just work for
 detecting the character set (i.e., language used).

 Can someone help ?


 def featurize(s: String): Vector = {
   val n = 1000
   val result = new Array[Double](n)
   val bigrams = s.sliding(2).toArray

   for (h - bigrams.map(_.hashCode % n)) {
 result(h) += 1.0 / bigrams.length
   }

   Vectors.sparse(n, result.zipWithIndex.filter(_._1 != 0).map(_.swap))
 }






-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-09-30 Thread Liquan Pei
Hi Haopu,

How about full outer join? One hash table may not be efficient for this
case.

Liquan

On Mon, Sep 29, 2014 at 11:47 PM, Haopu Wang hw...@qilinsoft.com wrote:

Hi, Liquan, thanks for the response.



 In your example, I think the hash table should be built on the right
 side, so Spark can iterate through the left side and find matches in the
 right side from the hash table efficiently. Please comment and suggest,
 thanks again!


  --

 *From:* Liquan Pei [mailto:liquan...@gmail.com]
 *Sent:* 2014年9月30日 12:31
 *To:* Haopu Wang
 *Cc:* d...@spark.apache.org; user
 *Subject:* Re: Spark SQL question: why build hashtable for both sides in
 HashOuterJoin?



 Hi Haopu,



 My understanding is that the hashtable on both left and right side is used
 for including null values in result in an efficient manner. If hash table
 is only built on one side, let's say left side and we perform a left outer
 join, for each row in left side, a scan over the right side is needed to
 make sure that no matching tuples for that row on left side.



 Hope this helps!

 Liquan



 On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote:

 I take a look at HashOuterJoin and it's building a Hashtable for both
 sides.

 This consumes quite a lot of memory when the partition is big. And it
 doesn't reduce the iteration on streamed relation, right?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: processing large number of files

2014-09-30 Thread Liquan Pei
You can use sc.wholeTextFiles to read a directory of text file.

Also, it seems from your code that you are only interested in the current
year's count, you can perform a filter before distinct() and perform a
reduce to sum up counts.

Hope this helps!
Liquan


On Tue, Sep 30, 2014 at 1:59 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I am  trying to compute the number of unique users from a year's worth of
 data. So there are about 300 files and each file is quite large (~GB).  I
 first tried this without a loop by reading all the files in the directory
 using the glob pattern:  sc.textFile(dir/*). But the tasks were stalling
 and I was getting a Too many open files warning, even though I increased
 the nofile limit to 500K.  The number of shuffle tasks that were being
 created was more than 200K and they were all generating shuffle files.
 Setting consolidateFiles to true did not help.

 So now I am reading the files in a loop as shown in the  code below. Now I
 dont run in to the Too many open files issue.  But the countByKey is
 taking a really long time (more then 15 hours and still ongoing). It
 appears
 from the logs that this operation is happening on a single node. From the
 logs, I am not able to figure out why it is taking so long. Each node has
 16
 GB memory and the mesos cluster has 16 nodes.  I have set  spark.serializer
 to KryoSerializer.  I am not running into any out of memory errors so far.
 Is there some way to improve the performance? Thanks.

 for (i - 1 to 300)
 {
  var f = file + i//name of the file
  val user_time = sc.textFile(f)
 .map(line = {
  val fields = line.split(\t)
  (fields(11), fields(6))
 }) // extract (year-month, user_id)
 .distinct()
 .countByKey  // group by with year as the key

 // now convert Map object to RDD in order to output results
 val ut_rdd = sc.parallelize(user_time.toSeq)

 // convert to array to extract the count. Need to find if there is
 an easier way to do this.
 var ar = ut_rdd.toArray()

 // aggregate the count for the year
 ucount = ucount + ar(0)._2
 }




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/processing-large-number-of-files-tp15429.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: memory vs data_size

2014-09-30 Thread Liquan Pei
Hi,

By default, 60% of JVM memory is reserved for RDD caching, so in your case,
72GB memory is available for RDDs which means that your total data may fit
in memory. You can check the RDD memory statistics via the storage tab in
web ui.

Hope this helps!
Liquan



On Tue, Sep 30, 2014 at 4:11 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 Is there a guidance about for a data of certain data size, how much total
 memory should be needed to achieve a relatively good speed?

 I have a data of around 200 GB and the current total memory for my 8
 machines are around 120 GB. Is that too small to run the data of this big?
 Even the read in and simple initial processing seems to last forever.

 Thanks a lot!




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/memory-vs-data-size-tp15443.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: aggregateByKey vs combineByKey

2014-09-29 Thread Liquan Pei
Hi Dave,

You can replace groupByKey with reduceByKey to improve performance in some
cases. reduceByKey performs map side combine which can reduce Network IO
and shuffle size where as groupByKey will not perform map side combine.

combineByKey is more general then aggregateByKey. Actually, the
implementation of aggregateByKey, reduceByKey and groupByKey is achieved by
combineByKey. aggregateByKey is similar to reduceByKey but you can provide
initial values when performing aggregation.

As the name suggests, aggregateByKey is suitable for compute aggregations
for keys, example aggregations such as sum, avg, etc. The rule here is that
the extra computation spent for map side combine can reduce the size sent
out to other nodes and driver. If your func has satisfies this rule, you
probably should use aggregateByKey.

combineByKey is more general and you have the flexibility to specify
whether you'd like to perform map side combine. However, it is more complex
to use. At minimum, you need to implement three functions: createCombiner,
mergeValue, mergeCombiners.

Hope this helps!
Liquan

On Sun, Sep 28, 2014 at 11:59 PM, David Rowe davidr...@gmail.com wrote:

 Hi All,

 After some hair pulling, I've reached the realisation that an operation I
 am currently doing via:

 myRDD.groupByKey.mapValues(func)

 should be done more efficiently using aggregateByKey or combineByKey. Both
 of these methods would do, and they seem very similar to me in terms of
 their function.

 My question is, what are the differences between these two methods (other
 than the slight differences in their type signatures)? Under what
 circumstances should I use one or the other?

 Thanks

 Dave





-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Simple Question: Spark Streaming Applications

2014-09-29 Thread Liquan Pei
Hi Saiph,

Map is used for transformation on your input RDD. If you don't  need
transformation of your input, you don't need to use map.

Thanks,
Liquan

On Mon, Sep 29, 2014 at 10:15 AM, Saiph Kappa saiph.ka...@gmail.com wrote:

 Hi,

 Do all spark streaming applications use the map operation? or the majority
 of them?

 Thanks.




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Fwd: about partition number

2014-09-29 Thread Liquan Pei
-- Forwarded message --
From: Liquan Pei liquan...@gmail.com
Date: Mon, Sep 29, 2014 at 2:12 PM
Subject: Re: about partition number
To: anny9699 anny9...@gmail.com


The number of cores available in your cluster determines the number of
tasks that can be run concurrently.  If your data is evenly partitioned,
the number of partitions should approximately equal to total_coreNumber.

Liquan

On Mon, Sep 29, 2014 at 2:01 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 I read the past posts about partition number, but am still a little
 confused
 about partitioning strategy.

 I have a cluster with 8 works and 2 cores for each work. Is it true that
 the
 optimal partition number should be 2-4 * total_coreNumber or should
 approximately equal to total_coreNumber? Or it's the task number that
 really
 determines the speed rather then partition number?

 Thanks a lot!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/about-partition-number-tp15362.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst



-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: about partition number

2014-09-29 Thread Liquan Pei
Hi Anny,

Much more partitions is not recommended in general as that creates a lot of
small tasks. All the tasks needs to send to worker nodes for execution.
Too many partitions increases task scheduling overhead.

Spark uses synchronous execution model which means that all tasks in a
stage need to finish before executing the next stage. 2-4 tasks per core
keep CPUs busy in cases that some tasks are small and finishes early.

Hope this helps!
Liquan

On Mon, Sep 29, 2014 at 2:17 PM, 陈韵竹 anny9...@gmail.com wrote:

 Thanks Liquan! I thought about the same thing, but then why people are
 still using much more partitions than core number?

 Anny

 On Mon, Sep 29, 2014 at 2:12 PM, Liquan Pei liquan...@gmail.com wrote:

 The number of cores available in your cluster determines the number of
 tasks that can be run concurrently.  If your data is evenly partitioned,
 the number of partitions should approximately equal to total_coreNumber.

 Liquan

 On Mon, Sep 29, 2014 at 2:01 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 I read the past posts about partition number, but am still a little
 confused
 about partitioning strategy.

 I have a cluster with 8 works and 2 cores for each work. Is it true that
 the
 optimal partition number should be 2-4 * total_coreNumber or should
 approximately equal to total_coreNumber? Or it's the task number that
 really
 determines the speed rather then partition number?

 Thanks a lot!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/about-partition-number-tp15362.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst





-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: in memory assumption in cogroup?

2014-09-29 Thread Liquan Pei
Hi Koert,

cogroup is a transformation on RDD and it creates a cogroupRDD and then
perform some transformations on it. When later an action is called, the
compute() method of the cogroupRDD will be called. Roughly speaking, each
element in cogroupRDD is fetched one at a time. Thus the contents of the
two iterables  do not need to fit in memory.

Hope this helps!
Liq

On Mon, Sep 29, 2014 at 4:02 PM, Koert Kuipers ko...@tresata.com wrote:

 apologies for asking yet again about spark memory assumptions, but i cant
 seem to keep it in my head.

 if i use PairRDDFunctions.cogroup, it returns for every key 2 iterables.
 do the contents of these iterables have to fit in memory? or is the data
 streamed?




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Spark SQL question: why build hashtable for both sides in HashOuterJoin?

2014-09-29 Thread Liquan Pei
Hi Haopu,

My understanding is that the hashtable on both left and right side is used
for including null values in result in an efficient manner. If hash table
is only built on one side, let's say left side and we perform a left outer
join, for each row in left side, a scan over the right side is needed to
make sure that no matching tuples for that row on left side.

Hope this helps!
Liquan

On Mon, Sep 29, 2014 at 8:36 PM, Haopu Wang hw...@qilinsoft.com wrote:

 I take a look at HashOuterJoin and it's building a Hashtable for both
 sides.

 This consumes quite a lot of memory when the partition is big. And it
 doesn't reduce the iteration on streamed relation, right?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Fwd: Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction?

2014-09-26 Thread Liquan Pei
-- Forwarded message --
From: Liquan Pei liquan...@gmail.com
Date: Fri, Sep 26, 2014 at 1:33 AM
Subject: Re: Spark SQL question: is cached SchemaRDD storage controlled by
spark.storage.memoryFraction?
To: Haopu Wang hw...@qilinsoft.com


Hi Haopu,

Internally, cactheTable on a schemaRDD is implemented as a cache() on a
MapPartitionsRDD. As memory reserved for caching RDDs is controlled by
spark.storage.memoryFraction,
memory storage of cached schemaRDD is controlled by
spark.storage.memoryFraction.

Hope this helps!
Liquan

On Fri, Sep 26, 2014 at 1:04 AM, Haopu Wang hw...@qilinsoft.com wrote:

 Hi, I'm querying a big table using Spark SQL. I see very long GC time in
 some stages. I wonder if I can improve it by tuning the storage
 parameter.

 The question is: the schemaRDD has been cached with cacheTable()
 function. So is the cached schemaRDD part of memory storage controlled
 by the spark.storage.memoryFraction parameter?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst



-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: MLUtils.loadLibSVMFile error

2014-09-25 Thread Liquan Pei
)
  
  
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  
   scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  
   org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:79)
  
   org.apache.spark.mllib.util.MLUtils$$anonfun$4.apply(MLUtils.scala:76)
   scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
   scala.collection.Iterator$class.foreach(Iterator.scala:727)
   scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  
  
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  
  
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
   org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
   org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
   org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
   org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
   org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
   org.apache.spark.scheduler.Task.run(Task.scala:51)
  
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   java.lang.Thread.run(Thread.java:744)
   Driver stacktrace:
   at
   org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
   at
  
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at
  
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
   at scala.Option.foreach(Option.scala:236)
   at
  
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
   at
  
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
  
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
  
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
  
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: RDD of Iterable[String]

2014-09-25 Thread Liquan Pei
Hi Deep,

I believe that you are referring to the map for Iterable[String]

suppose you have
iter:Iterable[String]
you can do

newIter = iter.map(item = Item + a )

which will create an new Iterable[String] with each element appending  an
a to all string in iter.

Does this answer your question?

Liquan

On Thu, Sep 25, 2014 at 12:43 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 what should come in the map??
 Thanks Liquan for answering me...
 I really need some help..I am stuck in some thing.


 On Thu, Sep 25, 2014 at 12:57 PM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 what should come in the map??

 On Wed, Sep 24, 2014 at 10:52 PM, Liquan Pei liquan...@gmail.com wrote:

 Hi Deep,

 The Iterable trait in scala has methods like map and reduce that you can
 use to iterate elements of Iterable[String]. You can also create an
 Iterator from the Iterable. For example, suppose you have

 val rdd: RDD[Iterable[String]]

 you can do

 rdd.map { x = //x has type Iterable[String]
x.map(...) // Process elements in iterable[String]
val iter:Iterator[String] = x.iterator
while(iter.hasNext) {
 iter.next()
}
 }

 Hope this helps!

 Liquan

 On Wed, Sep 24, 2014 at 8:21 AM, Deep Pradhan pradhandeep1...@gmail.com
  wrote:

 Can we iterate over RDD of Iterable[String]? How do we do that?
 Because the entire Iterable[String] seems to be a single element in the
 RDD.

 Thank You




 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst






-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: sortByKey trouble

2014-09-24 Thread Liquan Pei
Hi David,

Can you try val rddToSave = file.map(l = l.split(\\|)).map(r =
(r(34)+-+r(3), (r(4), r(10), r(12 ?

That should work.

Liquan



On Wed, Sep 24, 2014 at 1:29 AM, david david...@free.fr wrote:

 Hi,

   Does anybody know how to use sortbykey in scala on a RDD like  :

   val rddToSave = file.map(l = l.split(\\|)).map(r = (r(34)+-+r(3),
 r(4), r(10), r(12)))

   besauce, i received ann error sortByKey is not a member of
 ord.apache.spark.rdd.RDD[(String,String,String,String)].

 What i try do do is sort on the first element.


 Thank's







 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/sortByKey-trouble-tp14989.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: RDD of Iterable[String]

2014-09-24 Thread Liquan Pei
Hi Deep,

The Iterable trait in scala has methods like map and reduce that you can
use to iterate elements of Iterable[String]. You can also create an
Iterator from the Iterable. For example, suppose you have

val rdd: RDD[Iterable[String]]

you can do

rdd.map { x = //x has type Iterable[String]
   x.map(...) // Process elements in iterable[String]
   val iter:Iterator[String] = x.iterator
   while(iter.hasNext) {
iter.next()
   }
}

Hope this helps!

Liquan

On Wed, Sep 24, 2014 at 8:21 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Can we iterate over RDD of Iterable[String]? How do we do that?
 Because the entire Iterable[String] seems to be a single element in the
 RDD.

 Thank You




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: MLUtils.loadLibSVMFile error

2014-09-24 Thread Liquan Pei
(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: How to sort rdd filled with existing data structures?

2014-09-24 Thread Liquan Pei
You only need to define an ordering of student, no need to modify the class
definition of student. It's like a Comparator class in java.

Currently, you have to map the rdd to sort by value.

Liquan

On Wed, Sep 24, 2014 at 9:52 AM, Sean Owen so...@cloudera.com wrote:

 See the scaladoc for how to define an implicit ordering to use with
 sortByKey:


 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions

 Off the top of my head, I think this is 90% correct to order by age for
 example:

 implicit val studentOrdering: Ordering[Student] = Ordering.by(_.age)

 On Wed, Sep 24, 2014 at 3:07 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:
  Hi ,
 
  I have the following rdd :
 
val conf = new SparkConf()
.setAppName( Testing Sorting )
val sc = new SparkContext(conf)
 
val L = List(
(new Student(XiaoTao, 80, 29), I'm Xiaotao),
(new Student(CCC, 100, 24), I'm CCC),
(new Student(Jack, 90, 25), I'm Jack),
(new Student(Tom, 60, 35), I'm Tom),
(new Student(Lucy, 78, 22), I'm Lucy))
 
val rdd = sc.parallelize(L, 3)
 
 
  where Student is a class defined as follows:
 
  class Student(val name:String, val score:Int, val age:Int)  {
 
   override def toString =
   name: + name + , score: + score + , age: + age
 
  }
 
 
 
  I want to sort the rdd by key, but when I wrote rdd.sortByKey it
 complained
  that No implicit Ordering defined, which means I must extend the class
  with Ordered and provide a method named  compare.  The problem is that
 the
  class Student is from a third-party library so I cannot change its
  definition. I'd like to know if there is a sorting method that I can
 provide
  it a customized compare function so that it can sort the rdd according to
  the sorting function I provide.
 
  One more question, if I want to sort RDD[(k, v)] by value , do I have to
 map
  that rdd so that its key and value exchange their positions in the tuple?
  Are there any functions that allow us to sort rdd by things other than
 key ?
 
  Thanks
 
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: MLUtils.loadLibSVMFile error

2014-09-24 Thread Liquan Pei
)
   org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
   org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
   org.apache.spark.scheduler.Task.run(Task.scala:51)
  
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
  
  
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  
  
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   java.lang.Thread.run(Thread.java:744)
   Driver stacktrace:
   at
   org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
   at
  
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at
  
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
   at
  
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
   at scala.Option.foreach(Option.scala:236)
   at
  
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
   at
  
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
  
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
  
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
  
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: MLlib, what online(streaming) algorithms are available?

2014-09-23 Thread Liquan Pei
Hi Oleksiy,

Right now, only streaming linear regression is available in MLlib. There
are working in progress on Streaming K-means and Streaming SVM. Please take
a look at the following jiras for more information.

Streaming K-means https://issues.apache.org/jira/browse/SPARK-3254
Streaming SVM https://issues.apache.org/jira/browse/SPARK-3436

Thanks,
Liquan

On Tue, Sep 23, 2014 at 8:21 AM, aka.fe2s aka.f...@gmail.com wrote:

 Hi,

 I'm looking for available online ML algorithms (that improve model with
 new streaming data). The only one I found is linear regression.
 Is there anything else implemented as part of MLlib?

 Thanks, Oleksiy.




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: General question on persist

2014-09-23 Thread Liquan Pei
Hi Arun,

The intermediate results like keyedRecordPieces will not be materialized.
This indicates that if you run

partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)

partitoned.mapPartitions(doComputation).save()

again, the keyedRecordPieces will be re-computed . In this case, cache or
persist keyedRecordPieces is a good idea to eliminate unnecessary expensive
computation. What you can probably do is

keyedRecordPieces  = records.flatMap( record = Seq(key,
recordPieces)).cache()

Which will cache the RDD referenced by keyedRecordPieces in memory. For
more options on cache and persist, take a look at
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD.
There are two APIs you can use to persist RDDs and one allows you to
specify storage level.

Thanks,
Liquan



On Tue, Sep 23, 2014 at 2:08 PM, Arun Ahuja aahuj...@gmail.com wrote:

 I have a general question on when persisting will be beneficial and when
 it won't:

 I have a task that runs as follow

 keyedRecordPieces  = records.flatMap( record = Seq(key, recordPieces))
 partitoned = keyedRecordPieces.partitionBy(KeyPartitioner)

 partitoned.mapPartitions(doComputation).save()

 Is there value in having a persist somewhere here?  For example if the
 flatMap step is particularly expensive, will it ever be computed twice when
 there are no failures?

 Thanks

 Arun




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


Re: Memory compute-intensive tasks

2014-07-16 Thread Liquan Pei
Hi Ravi,

I have seen a similar issue before. You can try to set
fs.hdfs.impl.disable.cache to true in your hadoop configuration. For
example, suppose your hadoop configuration file is hadoopConf, you can use
hadoopConf.setBoolean(fs.hdfs.impl.disable.cache, true)

Let me know if that helps.

Best,
Liquan


On Wed, Jul 16, 2014 at 4:56 PM, rpandya r...@iecommerce.com wrote:

 Matei - I tried using coalesce(numNodes, true), but it then seemed to run
 too
 few SNAP tasks - only 2 or 3 when I had specified 46. The job failed,
 perhaps for unrelated reasons, with some odd exceptions in the log (at the
 end of this message). But I really don't want to force data movement
 between
 nodes. The input data is in HDFS and should already be somewhat balanced
 among the nodes. We've run this scenario using the simple hadoop jar
 runner and a custom format jar to break the input into 8-line chunks
 (paired
 FASTQ). Ideally I'd like Spark to do the minimum data movement to balance
 the work, feeding each task mostly from data local to that node.

 Daniel - that's a good thought, I could invoke a small stub for each task
 that talks to a single local demon process over a socket, and serializes
 all
 the tasks on a given machine.

 Thanks,

 Ravi

 P.S. Log exceptions:

 14/07/15 17:02:00 WARN yarn.ApplicationMaster: Unable to retrieve
 SparkContext in spite of waiting for 10, maxNumTries = 10
 Exception in thread main java.lang.NullPointerException
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkContextInitialized(ApplicationMaster.scala:233)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:110)

 ...and later...

 14/07/15 17:11:07 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM
 14/07/15 17:11:07 INFO yarn.ApplicationMaster: AppMaster received a signal.
 14/07/15 17:11:07 WARN rdd.NewHadoopRDD: Exception in RecordReader.close()
 java.io.IOException: Filesystem closed
 at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707)
 at
 org.apache.hadoop.hdfs.DFSInputStream.close(DFSInputStream.java:619)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Memory-compute-intensive-tasks-tp9643p9991.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst