Re: Spark 2.3.0 DataFrame.write.parquet() behavior change from 2.2.0

2018-05-07 Thread Victor Tso-Guillen
Found it: SPARK-21435

On Mon, May 7, 2018 at 2:18 PM Victor Tso-Guillen <v...@paxata.com> wrote:

> It appears that between 2.2.0 and 2.3.0 DataFrame.write.parquet() skips
> writing empty parquet files for empty partitions. Is this configurable? Is
> there a Jira that tracks this change?
>
> Thanks,
> Victor
>


Spark 2.3.0 DataFrame.write.parquet() behavior change from 2.2.0

2018-05-07 Thread Victor Tso-Guillen
It appears that between 2.2.0 and 2.3.0 DataFrame.write.parquet() skips
writing empty parquet files for empty partitions. Is this configurable? Is
there a Jira that tracks this change?

Thanks,
Victor


Re: Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?

2017-04-20 Thread Victor Tso-Guillen
Along with Priya's email slightly earlier than this one, we also are seeing
this happen on Spark 1.5.2.

On Wed, Jul 13, 2016 at 1:26 AM Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> In Spark Streaming job, I see a Batch failed with following error. Haven't
> seen anything like this earlier.
>
> This has happened during Shuffle for one Batch (haven't reoccurred after
> that).. Just curious to know what can cause this error. I am running Spark
> 1.5.1
>
> Regards,
> Dibyendu
>
>
> Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4 times, 
> most recent failure: Lost task 2801.3 in stage 9421.0: 
> java.lang.IllegalArgumentException: requirement failed: File segment length 
> cannot be negative (got -68321)
>   at scala.Predef$.require(Predef.scala:233)
>   at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216)
>   at 
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Re: Need some guidance

2015-04-14 Thread Victor Tso-Guillen
Thanks, yes. I was using Int for my V and didn't get the second param in
the second closure right :)

On Mon, Apr 13, 2015 at 1:55 PM, Dean Wampler deanwamp...@gmail.com wrote:

 That appears to work, with a few changes to get the types correct:

 input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) =
 agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2)

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 How about this?

 input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
 (agg1: Int, agg2: Int) = agg1 + agg2).collect()

 On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com
 wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco







Re: Need some guidance

2015-04-13 Thread Victor Tso-Guillen
How about this?

input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
(agg1: Int, agg2: Int) = agg1 + agg2).collect()

On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco





Re: 'nested' RDD problem, advise needed

2015-03-22 Thread Victor Tso-Guillen
Something like this?

(2 to alphabetLength toList).map(shift = Object.myFunction(inputRDD,
shift).map(v = shift - v).foldLeft(Object.myFunction(inputRDD, 1).map(v
= 1 - v))(_ union _)

which is an RDD[(Int, Char)]

Problem is that you can't play with RDDs inside of RDDs. The recursive
structure breaks the Spark programming model.

On Sat, Mar 21, 2015 at 10:26 AM, Michael Lewis lewi...@me.com wrote:

 Hi,

 I wonder if someone can help suggest a solution to my problem, I had a
 simple process working using Strings and now
 want to convert to RDD[Char], the problem is when I end up with a nested
 call as follow:


 1) Load a text file into an RDD[Char]

 val inputRDD = sc.textFile(“myFile.txt”).flatMap(_.toIterator)


 2) I have a method that takes two parameters:

 object Foo
 {
 def myFunction(inputRDD: RDD[Char], int val) : RDD[Char]
 ...


 3) I have a method that the driver process calls once its loaded the
 inputRDD ‘bar’ as follows:

 def bar(inputRDD: Rdd[Char) : Int = {

  val solutionSet = sc.parallelize(1 to alphabetLength
 toList).map(shift = (shift, Object.myFunction(inputRDD,shift)))



 What I’m trying to do is take a list 1..26 and generate a set of tuples {
 (1,RDD(1)), …. (26,RDD(26)) }  which is the inputRDD passed through
 the function above, but with different set of shift parameters.

 In my original I could parallelise the algorithm fine, but my input string
 had to be in a ‘String’ variable, I’d rather it be an RDD
 (string could be large). I think the way I’m trying to do it above won’t
 work because its a nested RDD call.

 Can anybody suggest a solution?

 Regards,
 Mike Lewis





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Build with Hadoop 2.6, yarn - encounter java.lang.NoClassDefFoundError: org/codehaus/jackson/map/deser/std/StdDeserializer

2015-03-05 Thread Victor Tso-Guillen
That particular class you did find is under parquet/... which means it was
shaded. Did you build your application against a hadoop2.6 dependency? The
maven central repo only has 2.2 but HDP has its own repos.

On Thu, Mar 5, 2015 at 10:04 AM, Todd Nist tsind...@gmail.com wrote:

 I am running Spark on a HortonWorks HDP Cluster. I have deployed there
 prebuilt version but it is only for Spark 1.2.0 not 1.2.1 and there are a
 few fixes and features in there that I would like to leverage.

 I just downloaded the spark-1.2.1 source and built it to support Hadoop
 2.6 by doing the following:

 radtech:spark-1.2.1 tnist$ ./make-distribution.sh --name hadoop2.6 --tgz 
 -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver 
 -DskipTests clean package

 When I deploy this to my hadoop cluster and kick of a spark-shell,

 $ spark-1.2.1-bin-hadoop2.6]# ./bin/spark-shell --master yarn-client 
 --driver-memory 512m --executor-memory 512m

 Results in  java.lang.NoClassDefFoundError:
 org/codehaus/jackson/map/deser/std/StdDeserializer

 The full stack trace is below. I have validate that the
 $SPARK_HOME/lib/spark-assembly-1.2.1-hadoop2.6.0.jar does infact contain
 the class in question:

 jar -tvf spark-assembly-1.2.1-hadoop2.6.0.jar | grep 
 'org/codehaus/jackson/map/deser/std'
 ...
  18002 Thu Mar 05 11:23:04 EST 2015  
 parquet/org/codehaus/jackson/map/deser/std/StdDeserializer.class
   1584 Thu Mar 05 11:23:04 EST 2015 
 parquet/org/codehaus/jackson/map/deser/std/StdKeyDeserializer$BoolKD.class...

 Any guidance on what I missed ? If i start the spark-shell in standalone
 it comes up fine, $SPARK_HOME/bin/spark-shell so it looks to be related
 to starting it under yarn from what I can tell.

 TIA for the assistance.

 -Todd
 Stack Trace

 15/03/05 12:12:38 INFO spark.SecurityManager: Changing view acls to: 
 root15/03/05 12:12:38 INFO spark.SecurityManager: Changing modify acls to: 
 root15/03/05 12:12:38 INFO spark.SecurityManager: SecurityManager: 
 authentication disabled; ui acls disabled; users with view permissions: 
 Set(root); users with modify permissions: Set(root)15/03/05 12:12:38 INFO 
 spark.HttpServer: Starting HTTP Server15/03/05 12:12:39 INFO server.Server: 
 jetty-8.y.z-SNAPSHOT15/03/05 12:12:39 INFO server.AbstractConnector: Started 
 SocketConnector@0.0.0.0:3617615/03/05 12:12:39 INFO util.Utils: Successfully 
 started service 'HTTP class server' on port 36176.
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
   /_/

 Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
 Type in expressions to have them evaluated.
 Type :help for more information.15/03/05 12:12:43 INFO spark.SecurityManager: 
 Changing view acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: 
 Changing modify acls to: root15/03/05 12:12:43 INFO spark.SecurityManager: 
 SecurityManager: authentication disabled; ui acls disabled; users with view 
 permissions: Set(root); users with modify permissions: Set(root)15/03/05 
 12:12:44 INFO slf4j.Slf4jLogger: Slf4jLogger started15/03/05 12:12:44 INFO 
 Remoting: Starting remoting15/03/05 12:12:44 INFO Remoting: Remoting started; 
 listening on addresses 
 :[akka.tcp://sparkdri...@hadoopdev01.opsdatastore.com:50544]15/03/05 12:12:44 
 INFO util.Utils: Successfully started service 'sparkDriver' on port 
 50544.15/03/05 12:12:44 INFO spark.SparkEnv: Registering 
 MapOutputTracker15/03/05 12:12:44 INFO spark.SparkEnv: Registering 
 BlockManagerMaster15/03/05 12:12:44 INFO storage.DiskBlockManager: Created 
 local directory at 
 /tmp/spark-16402794-cc1e-42d0-9f9c-99f15eaa1861/spark-118bc6af-4008-45d7-a22f-491bcd1856c015/03/05
  12:12:44 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 
 MB15/03/05 12:12:45 WARN util.NativeCodeLoader: Unable to load native-hadoop 
 library for your platform... using builtin-java classes where 
 applicable15/03/05 12:12:45 INFO spark.HttpFileServer: HTTP File server 
 directory is 
 /tmp/spark-5d7da34c-58d4-4d60-9b6a-3dce43cab39e/spark-4d65aacb-78bd-40fd-b6c0-53b47e28819915/03/05
  12:12:45 INFO spark.HttpServer: Starting HTTP Server15/03/05 12:12:45 INFO 
 server.Server: jetty-8.y.z-SNAPSHOT15/03/05 12:12:45 INFO 
 server.AbstractConnector: Started SocketConnector@0.0.0.0:5645215/03/05 
 12:12:45 INFO util.Utils: Successfully started service 'HTTP file server' on 
 port 56452.15/03/05 12:12:45 INFO server.Server: jetty-8.y.z-SNAPSHOT15/03/05 
 12:12:45 INFO server.AbstractConnector: Started 
 SelectChannelConnector@0.0.0.0:404015/03/05 12:12:45 INFO util.Utils: 
 Successfully started service 'SparkUI' on port 4040.15/03/05 12:12:45 INFO 
 ui.SparkUI: Started SparkUI at 
 http://hadoopdev01.opsdatastore.com:404015/03/05 12:12:46 INFO 
 impl.TimelineClientImpl: Timeline service address: 
 http://hadoopdev02.opsdatastore.com:8188/ws/v1/timeline/
 java.lang.NoClassDefFoundError: 
 

Re: Scheduler hang?

2015-02-28 Thread Victor Tso-Guillen
Moving user to bcc.

What I found was that the TaskSetManager for my task set that had 5 tasks
had preferred locations set for 4 of the 5. Three had localhost/driver
and had completed. The one that had nothing had also completed. The last
one was set by our code to be my IP address. Local mode can hang on this
because of https://issues.apache.org/jira/browse/SPARK-4939 addressed by
https://github.com/apache/spark/pull/4147, which is obviously not an
optimal solution but since it's only local mode, it's very good enough. I'm
not going to wait for those seconds to tick by to complete the task, so
I'll fix the IP address reporting side for local mode in my code.

On Thu, Feb 26, 2015 at 8:32 PM, Victor Tso-Guillen v...@paxata.com wrote:

 Of course, breakpointing on every status update and revive offers
 invocation kept the problem from happening. Where could the race be?

 On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Love to hear some input on this. I did get a standalone cluster up on my
 local machine and the problem didn't present itself. I'm pretty confident
 that means the problem is in the LocalBackend or something near it.

 On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Okay I confirmed my suspicions of a hang. I made a request that stopped
 progressing, though the already-scheduled tasks had finished. I made a
 separate request that was small enough not to hang, and it kicked the hung
 job enough to finish. I think what's happening is that the scheduler or the
 local backend is not kicking the revive offers messaging at the right time,
 but I have to dig into the code some more to nail the culprit. Anyone on
 these list have experience in those code areas that could help?

 On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Thanks for the link. Unfortunately, I turned on rdd compression and
 nothing changed. I tried moving netty - nio and no change :(

 On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Not many that i know of, but i bumped into this one
 https://issues.apache.org/jira/browse/SPARK-4516

 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
 dependencies that produce no data?

 On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will
 be gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which
 the system appears to hang. I'm worried about some sort of message loss 
 or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're
 recommending this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 What operation are you trying to do and how big is the data that
 you are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
 v...@paxata.com wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in
 local mode with parallelism at 8. I have 222 tasks and I never seem 
 to get
 far past 40. Usually in the 20s to 30s it will just hang. The last 
 logging
 is below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch
 worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor












Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Okay I confirmed my suspicions of a hang. I made a request that stopped
progressing, though the already-scheduled tasks had finished. I made a
separate request that was small enough not to hang, and it kicked the hung
job enough to finish. I think what's happening is that the scheduler or the
local backend is not kicking the revive offers messaging at the right time,
but I have to dig into the code some more to nail the culprit. Anyone on
these list have experience in those code areas that could help?

On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com wrote:

 Thanks for the link. Unfortunately, I turned on rdd compression and
 nothing changed. I tried moving netty - nio and no change :(

 On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Not many that i know of, but i bumped into this one
 https://issues.apache.org/jira/browse/SPARK-4516

 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
 dependencies that produce no data?

 On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will be
 gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which the
 system appears to hang. I'm worried about some sort of message loss or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're
 recommending this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 What operation are you trying to do and how big is the data that you
 are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in
 local mode with parallelism at 8. I have 222 tasks and I never seem to 
 get
 far past 40. Usually in the 20s to 30s it will just hang. The last 
 logging
 is below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes
 result sent to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch
 worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes
 result sent to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor









Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
The data is small. The job is composed of many small stages.

* I found that with fewer than 222 the problem exhibits. What will be
gained by going higher?
* Pushing up the parallelism only pushes up the boundary at which the
system appears to hang. I'm worried about some sort of message loss or
inconsistency.
* Yes, we are using Kryo.
* I'll try that, but I'm again a little confused why you're recommending
this. I'm stumped so might as well?

On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 What operation are you trying to do and how big is the data that you are
 operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in local
 mode with parallelism at 8. I have 222 tasks and I never seem to get far
 past 40. Usually in the 20s to 30s it will just hang. The last logging is
 below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-10]
 Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
 Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
 Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor





Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Thanks for the link. Unfortunately, I turned on rdd compression and nothing
changed. I tried moving netty - nio and no change :(

On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Not many that i know of, but i bumped into this one
 https://issues.apache.org/jira/browse/SPARK-4516

 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
 dependencies that produce no data?

 On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will be
 gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which the
 system appears to hang. I'm worried about some sort of message loss or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're recommending
 this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What operation are you trying to do and how big is the data that you
 are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in
 local mode with parallelism at 8. I have 222 tasks and I never seem to get
 far past 40. Usually in the 20s to 30s it will just hang. The last logging
 is below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes
 result sent to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
 Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result 
 sent
 to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
 Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result 
 sent
 to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor








Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
dependencies that produce no data?

On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will be
 gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which the
 system appears to hang. I'm worried about some sort of message loss or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're recommending
 this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What operation are you trying to do and how big is the data that you are
 operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in local
 mode with parallelism at 8. I have 222 tasks and I never seem to get far
 past 40. Usually in the 20s to 30s it will just hang. The last logging is
 below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-10]
 Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
 Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
 Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent
 to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor






Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Love to hear some input on this. I did get a standalone cluster up on my
local machine and the problem didn't present itself. I'm pretty confident
that means the problem is in the LocalBackend or something near it.

On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com wrote:

 Okay I confirmed my suspicions of a hang. I made a request that stopped
 progressing, though the already-scheduled tasks had finished. I made a
 separate request that was small enough not to hang, and it kicked the hung
 job enough to finish. I think what's happening is that the scheduler or the
 local backend is not kicking the revive offers messaging at the right time,
 but I have to dig into the code some more to nail the culprit. Anyone on
 these list have experience in those code areas that could help?

 On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Thanks for the link. Unfortunately, I turned on rdd compression and
 nothing changed. I tried moving netty - nio and no change :(

 On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Not many that i know of, but i bumped into this one
 https://issues.apache.org/jira/browse/SPARK-4516

 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
 dependencies that produce no data?

 On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will be
 gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which the
 system appears to hang. I'm worried about some sort of message loss or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're
 recommending this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 What operation are you trying to do and how big is the data that you
 are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen v...@paxata.com
  wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in
 local mode with parallelism at 8. I have 222 tasks and I never seem to 
 get
 far past 40. Usually in the 20s to 30s it will just hang. The last 
 logging
 is below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch
 worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor










Re: Scheduler hang?

2015-02-26 Thread Victor Tso-Guillen
Of course, breakpointing on every status update and revive offers
invocation kept the problem from happening. Where could the race be?

On Thu, Feb 26, 2015 at 7:55 PM, Victor Tso-Guillen v...@paxata.com wrote:

 Love to hear some input on this. I did get a standalone cluster up on my
 local machine and the problem didn't present itself. I'm pretty confident
 that means the problem is in the LocalBackend or something near it.

 On Thu, Feb 26, 2015 at 1:37 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Okay I confirmed my suspicions of a hang. I made a request that stopped
 progressing, though the already-scheduled tasks had finished. I made a
 separate request that was small enough not to hang, and it kicked the hung
 job enough to finish. I think what's happening is that the scheduler or the
 local backend is not kicking the revive offers messaging at the right time,
 but I have to dig into the code some more to nail the culprit. Anyone on
 these list have experience in those code areas that could help?

 On Thu, Feb 26, 2015 at 2:27 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Thanks for the link. Unfortunately, I turned on rdd compression and
 nothing changed. I tried moving netty - nio and no change :(

 On Thu, Feb 26, 2015 at 2:01 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Not many that i know of, but i bumped into this one
 https://issues.apache.org/jira/browse/SPARK-4516

 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 3:26 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is there any potential problem from 1.1.1 to 1.2.1 with shuffle
 dependencies that produce no data?

 On Thu, Feb 26, 2015 at 1:56 AM, Victor Tso-Guillen v...@paxata.com
 wrote:

 The data is small. The job is composed of many small stages.

 * I found that with fewer than 222 the problem exhibits. What will be
 gained by going higher?
 * Pushing up the parallelism only pushes up the boundary at which the
 system appears to hang. I'm worried about some sort of message loss or
 inconsistency.
 * Yes, we are using Kryo.
 * I'll try that, but I'm again a little confused why you're
 recommending this. I'm stumped so might as well?

 On Wed, Feb 25, 2015 at 11:13 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 What operation are you trying to do and how big is the data that you
 are operating on?

 Here's a few things which you can try:

 - Repartition the RDD to a higher number than 222
 - Specify the master as local[*] or local[10]
 - Use Kryo Serializer (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))
 - Enable RDD Compression (.set(spark.rdd.compress,true) )


 Thanks
 Best Regards

 On Thu, Feb 26, 2015 at 10:15 AM, Victor Tso-Guillen 
 v...@paxata.com wrote:

 I'm getting this really reliably on Spark 1.2.1. Basically I'm in
 local mode with parallelism at 8. I have 222 tasks and I never seem to 
 get
 far past 40. Usually in the 20s to 30s it will just hang. The last 
 logging
 is below, and a screenshot of the UI.

 2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
 TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
 localhost (1/5)
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-10] Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch
 worker-8] Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
 TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
 localhost (2/5)
 2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
 TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
 localhost (3/5)
 2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch
 worker-9] Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 
 bytes
 result sent to driver
 2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
 TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
 localhost (4/5)

 [image: Inline image 1]
 What should I make of this? Where do I start?

 Thanks,
 Victor











Scheduler hang?

2015-02-25 Thread Victor Tso-Guillen
I'm getting this really reliably on Spark 1.2.1. Basically I'm in local
mode with parallelism at 8. I have 222 tasks and I never seem to get far
past 40. Usually in the 20s to 30s it will just hang. The last logging is
below, and a screenshot of the UI.

2015-02-25 20:39:55.779 GMT-0800 INFO  [task-result-getter-3]
TaskSetManager - Finished task 3.0 in stage 16.0 (TID 22) in 612 ms on
localhost (1/5)
2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-10]
Executor - Finished task 1.0 in stage 16.0 (TID 20). 2492 bytes result sent
to driver
2015-02-25 20:39:55.825 GMT-0800 INFO  [Executor task launch worker-8]
Executor - Finished task 2.0 in stage 16.0 (TID 21). 2492 bytes result sent
to driver
2015-02-25 20:39:55.831 GMT-0800 INFO  [task-result-getter-0]
TaskSetManager - Finished task 1.0 in stage 16.0 (TID 20) in 670 ms on
localhost (2/5)
2015-02-25 20:39:55.836 GMT-0800 INFO  [task-result-getter-1]
TaskSetManager - Finished task 2.0 in stage 16.0 (TID 21) in 674 ms on
localhost (3/5)
2015-02-25 20:39:55.891 GMT-0800 INFO  [Executor task launch worker-9]
Executor - Finished task 0.0 in stage 16.0 (TID 19). 2492 bytes result sent
to driver
2015-02-25 20:39:55.896 GMT-0800 INFO  [task-result-getter-2]
TaskSetManager - Finished task 0.0 in stage 16.0 (TID 19) in 740 ms on
localhost (4/5)

[image: Inline image 1]
What should I make of this? Where do I start?

Thanks,
Victor


Re: heterogeneous cluster setup

2014-12-04 Thread Victor Tso-Guillen
To reiterate, it's very important for Spark's workers to have the same
memory available. Think about Spark uniformly chopping up your data and
distributing the work to the nodes. The algorithm is not designed to
consider that a worker has less memory available than some other worker.

On Thu, Dec 4, 2014 at 12:11 AM, rapelly kartheek kartheek.m...@gmail.com
wrote:


 *It's very important for Spark's workers to have the same resources
 available*

 So, each worker should have same amount of memory and same number of
 cores. But, heterogeneity of the cluster in the physical layout of cpu is
 understandable, but how about heterogeneity with respect to memory?

 On Thu, Dec 4, 2014 at 12:18 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 You'll have to decide which is more expensive in your heterogenous
 environment and optimize for the utilization of that. For example, you may
 decide that memory is the only costing factor and you can discount the
 number of cores. Then you could have 8GB on each worker each with four
 cores. Note that cores in Spark don't necessarily map to cores on the
 machine. It's just a configuration setting for how many simultaneous tasks
 that worker can work on.

 You are right that each executor gets the same amount of resources and I
 would add level of parallelization. Your heterogeneity is in the physical
 layout of your cluster, not in how Spark treats the workers as resources.
 It's very important for Spark's workers to have the same resources
 available because it needs to be able to generically divide and conquer
 your data amongst all those workers.

 Hope that helps,
 Victor

 On Wed, Dec 3, 2014 at 10:04 PM, rapelly kartheek 
 kartheek.m...@gmail.com wrote:

 Thank you so much for valuable reply, Victor. That's a very clear
 solution I understood.

 Right now I have nodes with:
 16Gb RAM, 4 cores; 8GB RAM, 4cores; 8GB RAM, 2 cores. From my
 understanding, the division could be something like, each executor can have
 2 cores and 6GB RAM.   So, the ones with 16GB RAM and 4 cores can have two
 executors. Please let me know if my understanding is correct.

 But, I am not able to see  any heterogeneity in this setting as each
 executor has got the same amount of resources. Can you please clarify this
 doubt?

 Regards
 Karthik

 On Wed, Dec 3, 2014 at 11:11 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 I don't have a great answer for you. For us, we found a common divisor,
 not necessarily a whole gigabyte, of the available memory of the different
 hardware and used that as the amount of memory per worker and scaled the
 number of cores accordingly so that every core in the system has the same
 amount of memory. The quotient of the available memory and the common
 divisor, hopefully a whole number to reduce waste, was the number of
 workers we spun up. Therefore, if you have 64G, 30G, and 15G available
 memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1
 worker per machine. Every worker on all the machines would have the same
 number of cores, set to what you think is a good value.

 Hope that helps.

 On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote:

 Hi Victor,

 I want to setup a heterogeneous stand-alone spark cluster. I have
 hardware with different memory sizes and varied number of cores per node. 
 I
 could get all the nodes active in the cluster only when the size of memory
 per executor is set as the least available memory size of all nodes and is
 same with no.of cores/executor. As of now, I configure one executor per
 node.

 Can you please suggest some path to set up a stand-alone
 heterogeneous  cluster such that I can efficiently use the available
 hardware.

 Thank you




 _
 Sent from http://apache-spark-user-list.1001560.n3.nabble.com








Re: heterogeneous cluster setup

2014-12-03 Thread Victor Tso-Guillen
I don't have a great answer for you. For us, we found a common divisor, not
necessarily a whole gigabyte, of the available memory of the different
hardware and used that as the amount of memory per worker and scaled the
number of cores accordingly so that every core in the system has the same
amount of memory. The quotient of the available memory and the common
divisor, hopefully a whole number to reduce waste, was the number of
workers we spun up. Therefore, if you have 64G, 30G, and 15G available
memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1
worker per machine. Every worker on all the machines would have the same
number of cores, set to what you think is a good value.

Hope that helps.

On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote:

 Hi Victor,

 I want to setup a heterogeneous stand-alone spark cluster. I have hardware
 with different memory sizes and varied number of cores per node. I could
 get all the nodes active in the cluster only when the size of memory per
 executor is set as the least available memory size of all nodes and is same
 with no.of cores/executor. As of now, I configure one executor per node.

 Can you please suggest some path to set up a stand-alone heterogeneous
 cluster such that I can efficiently use the available hardware.

 Thank you




 _
 Sent from http://apache-spark-user-list.1001560.n3.nabble.com




Re: heterogeneous cluster setup

2014-12-03 Thread Victor Tso-Guillen
You'll have to decide which is more expensive in your heterogenous
environment and optimize for the utilization of that. For example, you may
decide that memory is the only costing factor and you can discount the
number of cores. Then you could have 8GB on each worker each with four
cores. Note that cores in Spark don't necessarily map to cores on the
machine. It's just a configuration setting for how many simultaneous tasks
that worker can work on.

You are right that each executor gets the same amount of resources and I
would add level of parallelization. Your heterogeneity is in the physical
layout of your cluster, not in how Spark treats the workers as resources.
It's very important for Spark's workers to have the same resources
available because it needs to be able to generically divide and conquer
your data amongst all those workers.

Hope that helps,
Victor

On Wed, Dec 3, 2014 at 10:04 PM, rapelly kartheek kartheek.m...@gmail.com
wrote:

 Thank you so much for valuable reply, Victor. That's a very clear solution
 I understood.

 Right now I have nodes with:
 16Gb RAM, 4 cores; 8GB RAM, 4cores; 8GB RAM, 2 cores. From my
 understanding, the division could be something like, each executor can have
 2 cores and 6GB RAM.   So, the ones with 16GB RAM and 4 cores can have two
 executors. Please let me know if my understanding is correct.

 But, I am not able to see  any heterogeneity in this setting as each
 executor has got the same amount of resources. Can you please clarify this
 doubt?

 Regards
 Karthik

 On Wed, Dec 3, 2014 at 11:11 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 I don't have a great answer for you. For us, we found a common divisor,
 not necessarily a whole gigabyte, of the available memory of the different
 hardware and used that as the amount of memory per worker and scaled the
 number of cores accordingly so that every core in the system has the same
 amount of memory. The quotient of the available memory and the common
 divisor, hopefully a whole number to reduce waste, was the number of
 workers we spun up. Therefore, if you have 64G, 30G, and 15G available
 memory on your machines, the divisor could be 15G and you'd have 4, 2 and 1
 worker per machine. Every worker on all the machines would have the same
 number of cores, set to what you think is a good value.

 Hope that helps.

 On Wed, Dec 3, 2014 at 7:44 AM, kartheek.m...@gmail.com wrote:

 Hi Victor,

 I want to setup a heterogeneous stand-alone spark cluster. I have
 hardware with different memory sizes and varied number of cores per node. I
 could get all the nodes active in the cluster only when the size of memory
 per executor is set as the least available memory size of all nodes and is
 same with no.of cores/executor. As of now, I configure one executor per
 node.

 Can you please suggest some path to set up a stand-alone heterogeneous
 cluster such that I can efficiently use the available hardware.

 Thank you




 _
 Sent from http://apache-spark-user-list.1001560.n3.nabble.com






Re: Parallelize independent tasks

2014-12-02 Thread Victor Tso-Guillen
dirs.par.foreach { case (src,dest) =
sc.textFile(src).process.saveAsFile(dest) }

Is that sufficient for you?

On Tuesday, December 2, 2014, Anselme Vignon anselme.vig...@flaminem.com
wrote:

 Hi folks,


 We have written a spark job that scans multiple hdfs directories and
 perform transformations on them.

 For now, this is done with a simple for loop that starts one task at
 each iteration. This looks like:

 dirs.foreach { case (src,dest) =
 sc.textFile(src).process.saveAsFile(dest) }


 However, each iteration is independent, and we would like to optimize
 that by running
 them with spark simultaneously (or in a chained fashion), such that we
 don't have
 idle executors at the end of each iteration (some directories
 sometimes only require one partition)


 Has anyone already done such a thing? How would you suggest we could do
 that?

 Cheers,

 Anselme

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org javascript:;




Re: java.lang.NumberFormatException while starting spark-worker

2014-09-24 Thread Victor Tso-Guillen
Do you have a newline or some other strange character in an argument you
pass to spark that includes that 61608?

On Wed, Sep 24, 2014 at 4:11 AM, jishnu.prat...@wipro.com wrote:

  Hi ,
*I am getting this weird error while starting Worker. *

 -bash-4.1$ spark-class org.apache.spark.deploy.worker.Worker
 spark://osebi-UServer:59468
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 14/09/24 16:22:04 INFO worker.Worker: Registered signal handlers for
 [TERM, HUP, INT]
 xception in thread main java.lang.NumberFormatException: For input
 string: 61608
 at
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

 at java.lang.Integer.parseInt(Integer.java:492)
 at java.lang.Integer.parseInt(Integer.java:527)
 at
 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
 at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
 at
 org.apache.spark.deploy.worker.WorkerArguments.init(WorkerArguments.scala:38)

 at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:376)
 at org.apache.spark.deploy.worker.Worker.main(Worker.scala)

 The information contained in this electronic message and any attachments
 to this message are intended for the exclusive use of the addressee(s) and
 may contain proprietary, confidential or privileged information. If you are
 not the intended recipient, you should not disseminate, distribute or copy
 this e-mail. Please notify the sender immediately and destroy all copies of
 this message and any attachments.

 WARNING: Computer viruses can be transmitted via email. The recipient
 should check this email and any attachments for the presence of viruses.
 The company accepts no liability for any damage caused by any virus
 transmitted by this email.

 www.wipro.com



java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2014-09-24 Thread Victor Tso-Guillen
Really? What should we make of this?

24 Sep 2014 10:03:36,772 ERROR [Executor task launch worker-52] Executor -
Exception in task ID 40599

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)

at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)

at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415)

at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:341)

at org.apache.spark.storage.BlockManager.get(BlockManager.scala:508)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:96)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

at org.apache.spark.scheduler.Task.run(Task.scala:51)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:724)

24 Sep 2014 10:03:40,936 ERROR [pool-1-thread-7] BlockManagerWorker -
Exception handling buffer message

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)

at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)

at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415)

at
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:359)

at
org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)

at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)

at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)

at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at
org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)

at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at
org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)

at
org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)

at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)

at
org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)

at org.apache.spark.network.ConnectionManager.org
$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662)

at
org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:724)


Re: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

2014-09-24 Thread Victor Tso-Guillen
Never mind: https://issues.apache.org/jira/browse/SPARK-1476

On Wed, Sep 24, 2014 at 11:10 AM, Victor Tso-Guillen v...@paxata.com
wrote:

 Really? What should we make of this?

 24 Sep 2014 10:03:36,772 ERROR [Executor task launch worker-52] Executor -
 Exception in task ID 40599

 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)

 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)

 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415)

 at
 org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:341)

 at
 org.apache.spark.storage.BlockManager.get(BlockManager.scala:508)

 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:96)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)

 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

 at org.apache.spark.scheduler.Task.run(Task.scala:51)

 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:724)

 24 Sep 2014 10:03:40,936 ERROR [pool-1-thread-7] BlockManagerWorker -
 Exception handling buffer message

 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:789)

 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)

 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:415)

 at
 org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:359)

 at
 org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:90)

 at
 org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:69)

 at
 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)

 at
 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:44)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at
 org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)

 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

 at
 org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)

 at
 org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:44)

 at
 org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)

 at
 org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:34)

 at org.apache.spark.network.ConnectionManager.org
 $apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:662)

 at
 org.apache.spark.network.ConnectionManager$$anon$9.run(ConnectionManager.scala:504)

 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

 at java.lang.Thread.run(Thread.java:724)



Re: Sorting a Table in Spark RDD

2014-09-23 Thread Victor Tso-Guillen
You could pluck out each column in separate rdds, sort them independently,
and zip them :)

On Tue, Sep 23, 2014 at 2:40 PM, Areg Baghdasaryan (BLOOMBERG/ 731 LEX -) 
abaghdasa...@bloomberg.net wrote:

 Hello,

 So I have crated a table in in RDD in spark in thei format:
 col1 col2
 ---
 1. 10 11
 2. 12 8
 3. 9 13
 4. 2 3

 And the RDD is ristributed by the rows (rows 1, 2 on one node and rows 3 4
 on another)
 I want to sort each column of the table so that that output is the
 following:


 col1 col2
 ---
 1. 2 3
 2. 9 8
 3. 10 11
 4. 122 13

 Is tehre a easy way to do this with spark RDD? The only way that i can
 think of so far is to transpose the table somehow..

 Thanks
 Areg



Re: Reproducing the function of a Hadoop Reducer

2014-09-20 Thread Victor Tso-Guillen
So sorry about teasing you with the Scala. But the method is there in Java
too, I just checked.

On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen v...@paxata.com wrote:

 It might not be the same as a real hadoop reducer, but I think it would
 accomplish the same. Take a look at:

 import org.apache.spark.SparkContext._
 // val rdd: RDD[(K, V)]
 // def zero(value: V): S
 // def reduce(agg: S, value: V): S
 // def merge(agg1: S, agg2: S): S
 val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
 merge)
 reducedUnsorted.sortByKey()

 On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:

  I am struggling to reproduce the functionality of a Hadoop reducer on
 Spark (in Java)

 in Hadoop I have a function
 public void doReduce(K key, IteratorV values)
 in Hadoop there is also a consumer (context write) which can be seen as
 consume(key,value)

 In my code
 1) knowing the key is important to the function
 2) there is neither one output tuple2 per key nor one output tuple2 per
 value
 3) the number of values per key might be large enough that storing them
 in memory is impractical
 4) keys must appear in sorted order

 one good example would run through a large document using a similarity
 function to look at the last 200 lines and output any of those with a
 similarity of more than 0.3 (do not suggest output all and filter - the
 real problem is more complex) the critical concern is an uncertain number
 of tuples per key.

 my questions
 1) how can this be done - ideally a consumer would be a JavaPairRDD but I
 don't see how to create one and add items later

 2) how do I handle the entire partition, sort, process (involving calls
 to doReduce) process







Re: Reproducing the function of a Hadoop Reducer

2014-09-20 Thread Victor Tso-Guillen
   1. Actually, I disagree that combineByKey requires that all values be
   held in memory for a key. Only the use case groupByKey does that, whereas
   reduceByKey, foldByKey, and the generic combineByKey do not necessarily
   make that requirement. If your combine logic really shrinks the result
   value by a lot, I think it would be worth it to make sure mapSideCombine is
   true.
   2. In order to get the key into the combine logic, you may need to
   project it into a (K, (K, V)). I'm not sure there's a method that otherwise
   provides the information you're asking for. Unfortunately, that is a lot
   more heavyweight.
   3. If you absolutely need the keys in sorted order before you combine,
   then perhaps you could sortByKey before doing your combineByKey, but you
   pay the price of a bigger shuffle doing it that way.

I hope that helps. If not, perhaps you can sketch out in more detail what
you're trying to accomplish and I or someone else can guide you through.

Cheers,
Y

On Sat, Sep 20, 2014 at 11:09 AM, Steve Lewis lordjoe2...@gmail.com wrote:

 OK so in Java - pardon the verbosity I might say something like the code
 below
 but I face the following issues
 1) I need to store all values in memory as I run combineByKey - it I could
 return an RDD which consumed values that would be great but I don't know
 how to do that -
 2) In my version of the functions I get a tuple so I know the key but all
 of Scala's functtions for byKey do not make the key available - this may
 work for a trivial function like wordcount but the code I want to port
 needs to know the key when processing values
 3) it is important the I have control over partitioning - I can do that
 with mapPartition but it is also important that within a partition keys be
 received in sorted order - easy if every partition could  a separate RDD -
 combined later but I am not sure how that works.

 in Hadoop then I reduce the values for each key I get an interator and do
 not need to keep all values in memory. Similarly while the output in Hadoop
 is written to disk as key values in Spark it could populate a JavaPairRDD
 if there were a way to do that lazily

 One other issue - I don't see a good way to say a merge function is
 finished - i.e. no further data is coming in which would be useful in
 processing steps.



 /**
  * a class to store a key and all its values
  *   using an array list
  * @param K key type
  * @param V value type
  */
 public static class KeyAndValuesK, V {
 public final K key;
 private final ArrayListV values = new ArrayListV();
 public KeyAndValues(final K pKey) {
 key = pKey;
 }
  public void addValue(V added) {
 values.add(added);
 }
  public IterableV getIterable() {
 return values;
 }
  public KeyAndValuesK, V merge(KeyAndValuesK, V merged) {
 values.addAll(merged.values);
 return this;
 }
 }

  // start function for combine by key - gets key from first tuple
public static class CombineStartKeyAndValuesK, V implements
 FunctionTuple2K,V, KeyAndValuesK, V {
   public KeyAndValues call(Tuple2K,V x) {
 KeyAndValues ret = new KeyAndValues(x._1());
 ret.addValue(x._2());
 return ret;
 }
 }

 // continue function for combine by key -  adds values to array
  public static class CombineContinueKeyAndValuesK, V implements
 Function2 KeyAndValues K,V, Tuple2K,V, KeyAndValuesK, V {
  public KeyAndValuesK, V call(final KeyAndValuesK, V kvs,
 final Tuple2K,V added) throws Exception {
 kvs.addValue(added._2());
 return kvs;
 }
 }

   // merge function - merges arrays - NOTE there is no signal to say
 merge is done
  public static class CombineMergeKeyAndValuesK, V implements
 Function2 KeyAndValuesK, V,KeyAndValuesK, V,KeyAndValuesK, V {
   public KeyAndValuesK, V call(final KeyAndValuesK, V v1,
 final KeyAndValuesK, V v2) throws Exception {
 return null;
 }
 }

 On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 So sorry about teasing you with the Scala. But the method is there in
 Java too, I just checked.

 On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 It might not be the same as a real hadoop reducer, but I think it would
 accomplish the same. Take a look at:

 import org.apache.spark.SparkContext._
 // val rdd: RDD[(K, V)]
 // def zero(value: V): S
 // def reduce(agg: S, value: V): S
 // def merge(agg1: S, agg2: S): S
 val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce,
 merge)
 reducedUnsorted.sortByKey()

 On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:

  I am struggling to reproduce the functionality of a Hadoop reducer on
 Spark (in Java)

 in Hadoop I have a function
 public

Re: diamond dependency tree

2014-09-19 Thread Victor Tso-Guillen
Yes, sorry I meant DAG. I fixed it in my message but not the subject. The
terminology of leaf wasn't helpful I know so hopefully my visual example
was enough. Anyway, I noticed what you said in a local-mode test. I can try
that in a cluster, too. Thank you!

On Thu, Sep 18, 2014 at 10:28 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 Is it possible to express a diamond DAG and have the leaf dependency
 evaluate only once?


 Well, strictly speaking your graph is not a tree, and also the meaning
 of leaf is not totally clear, I'd say.


 So say data flows left to right (and the dependencies are oriented right
 to left):

 [image: Inline image 1]
 Is it possible to run d.collect() and have a evaluate its iterator only
 once?


 If you say a.cache() (or a.persist()) then it will be evaluated only once
 and then the cached data will be used for later accesses.

 Tobias



diamond dependency tree

2014-09-18 Thread Victor Tso-Guillen
Is it possible to express a diamond DAG and have the leaf dependency
evaluate only once? So say data flows left to right (and the dependencies
are oriented right to left):

[image: Inline image 1]
Is it possible to run d.collect() and have a evaluate its iterator only
once?


Re: diamond dependency tree

2014-09-18 Thread Victor Tso-Guillen
Caveat: all arrows are shuffle dependencies.

On Thu, Sep 18, 2014 at 8:55 PM, Victor Tso-Guillen v...@paxata.com wrote:

 Is it possible to express a diamond DAG and have the leaf dependency
 evaluate only once? So say data flows left to right (and the dependencies
 are oriented right to left):

 [image: Inline image 1]
 Is it possible to run d.collect() and have a evaluate its iterator only
 once?



Re: Configuring Spark for heterogenous hardware

2014-09-17 Thread Victor Tso-Guillen
I'm supposing that there's no good solution to having heterogenous hardware
in a cluster. What are the prospects of having something like this in the
future? Am I missing an architectural detail that precludes this
possibility?

Thanks,
Victor

On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
wrote:

 Ping...

 On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 So I have a bunch of hardware with different core and memory setups. Is
 there a way to do one of the following:

 1. Express a ratio of cores to memory to retain. The spark worker config
 would represent all of the cores and all of the memory usable for any
 application, and the application would take a fraction that sustains the
 ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker
 take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing
 both out. If there were only 16G with the same ratio requirement, it would
 only take 3 cores and 12G in a single executor and leave the rest.

 2. Have the executor take whole number ratios of what it needs. Say it is
 configured for 2/8G and the worker has 4/20. So we can give the executor
 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of
 the two parameters.

 Either way would allow me to get my heterogenous hardware all
 participating in the work of my spark cluster, presumably without
 endangering spark's assumption of homogenous execution environments in the
 dimensions of memory and cores. If there's any way to do this, please
 enlighten me.





Re: Configuring Spark for heterogenous hardware

2014-09-17 Thread Victor Tso-Guillen
Hmm, interesting. I'm using standalone mode but I could consider YARN. I'll
have to simmer on that one. Thanks as always, Sean!

On Wed, Sep 17, 2014 at 12:40 AM, Sean Owen so...@cloudera.com wrote:

 I thought I answered this ... you can easily accomplish this with YARN
 by just telling YARN how much memory / CPU each machine has. This can
 be configured in groups too rather than per machine. I don't think you
 actually want differently-sized executors, and so don't need ratios.
 But you can have differently-sized containers which can fit different
 numbers of executors as appropriate.

 On Wed, Sep 17, 2014 at 8:35 AM, Victor Tso-Guillen v...@paxata.com
 wrote:
  I'm supposing that there's no good solution to having heterogenous
 hardware
  in a cluster. What are the prospects of having something like this in the
  future? Am I missing an architectural detail that precludes this
  possibility?
 
  Thanks,
  Victor
 
  On Fri, Sep 12, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
  wrote:
 
  Ping...
 
  On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com
  wrote:
 
  So I have a bunch of hardware with different core and memory setups. Is
  there a way to do one of the following:
 
  1. Express a ratio of cores to memory to retain. The spark worker
 config
  would represent all of the cores and all of the memory usable for any
  application, and the application would take a fraction that sustains
 the
  ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the
 worker
  take 4/20 and the executor take 5 G for each of the 4 cores, thus
 maxing
  both out. If there were only 16G with the same ratio requirement, it
 would
  only take 3 cores and 12G in a single executor and leave the rest.
 
  2. Have the executor take whole number ratios of what it needs. Say it
 is
  configured for 2/8G and the worker has 4/20. So we can give the
 executor
  2/8G (which is true now) or we can instead give it 4/16G, maxing out
 one of
  the two parameters.
 
  Either way would allow me to get my heterogenous hardware all
  participating in the work of my spark cluster, presumably without
  endangering spark's assumption of homogenous execution environments in
 the
  dimensions of memory and cores. If there's any way to do this, please
  enlighten me.
 
 
 



Re: Configuring Spark for heterogenous hardware

2014-09-12 Thread Victor Tso-Guillen
Ping...

On Thu, Sep 11, 2014 at 5:44 PM, Victor Tso-Guillen v...@paxata.com wrote:

 So I have a bunch of hardware with different core and memory setups. Is
 there a way to do one of the following:

 1. Express a ratio of cores to memory to retain. The spark worker config
 would represent all of the cores and all of the memory usable for any
 application, and the application would take a fraction that sustains the
 ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker
 take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing
 both out. If there were only 16G with the same ratio requirement, it would
 only take 3 cores and 12G in a single executor and leave the rest.

 2. Have the executor take whole number ratios of what it needs. Say it is
 configured for 2/8G and the worker has 4/20. So we can give the executor
 2/8G (which is true now) or we can instead give it 4/16G, maxing out one of
 the two parameters.

 Either way would allow me to get my heterogenous hardware all
 participating in the work of my spark cluster, presumably without
 endangering spark's assumption of homogenous execution environments in the
 dimensions of memory and cores. If there's any way to do this, please
 enlighten me.



Backwards RDD

2014-09-11 Thread Victor Tso-Guillen
Iterating an RDD gives you each partition in order of their split index.
I'd like to be able to get each partition in reverse order, but I'm having
difficultly implementing the compute() method. I thought I could do
something like this:

  override def getDependencies: Seq[Dependency[_]] = {
Seq(new NarrowDependency[T](prev) {
  def getParents(partitionId: Int): Seq[Int] = {
Seq(prev.partitions.size - partitionId - 1)
  }
})
  }

  override def compute(split: Partition, context: TaskContext): Iterator[T]
= {
firstParent[T].iterator(split, context).toArray.reverseIterator
  }

But that doesn't work. How do I get one split to depend on exactly one
split from the parent that does not match indices?


Configuring Spark for heterogenous hardware

2014-09-11 Thread Victor Tso-Guillen
So I have a bunch of hardware with different core and memory setups. Is
there a way to do one of the following:

1. Express a ratio of cores to memory to retain. The spark worker config
would represent all of the cores and all of the memory usable for any
application, and the application would take a fraction that sustains the
ratio. Say I have 4 cores and 20G of RAM. I'd like it to have the worker
take 4/20 and the executor take 5 G for each of the 4 cores, thus maxing
both out. If there were only 16G with the same ratio requirement, it would
only take 3 cores and 12G in a single executor and leave the rest.

2. Have the executor take whole number ratios of what it needs. Say it is
configured for 2/8G and the worker has 4/20. So we can give the executor
2/8G (which is true now) or we can instead give it 4/16G, maxing out one of
the two parameters.

Either way would allow me to get my heterogenous hardware all participating
in the work of my spark cluster, presumably without endangering spark's
assumption of homogenous execution environments in the dimensions of memory
and cores. If there's any way to do this, please enlighten me.


Re: prepending jars to the driver class path for spark-submit on YARN

2014-09-06 Thread Victor Tso-Guillen
I ran into the same issue. What I did was use maven shade plugin to shade
my version of httpcomponents libraries into another package.


On Fri, Sep 5, 2014 at 4:33 PM, Penny Espinoza 
pesp...@societyconsulting.com wrote:

  Hey - I’m struggling with some dependency issues with
 org.apache.httpcomponents httpcore and httpclient when using spark-submit
 with YARN running Spark 1.0.2 on a Hadoop 2.2 cluster.  I’ve seen several
 posts about this issue, but no resolution.

  The error message is this:


  Caused by: java.lang.NoSuchMethodError:
 org.apache.http.impl.conn.DefaultClientConnectionOperator.init(Lorg/apache/http/conn/scheme/SchemeRegistry;Lorg/apache/http/conn/DnsResolver;)V
 at
 org.apache.http.impl.conn.PoolingClientConnectionManager.createConnectionOperator(PoolingClientConnectionManager.java:140)
 at
 org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:114)
 at
 org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:99)
 at
 org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:85)
 at
 org.apache.http.impl.conn.PoolingClientConnectionManager.init(PoolingClientConnectionManager.java:93)
 at
 com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
 at
 com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
 at
 com.amazonaws.http.AmazonHttpClient.init(AmazonHttpClient.java:155)
 at
 com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:118)
 at
 com.amazonaws.AmazonWebServiceClient.init(AmazonWebServiceClient.java:102)
 at
 com.amazonaws.services.s3.AmazonS3Client.init(AmazonS3Client.java:332)
 at
 com.oncue.rna.realtime.streaming.config.package$.transferManager(package.scala:76)
 at
 com.oncue.rna.realtime.streaming.models.S3SchemaRegistry.init(SchemaRegistry.scala:27)
 at
 com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry$lzycompute(SchemaRegistry.scala:46)
 at
 com.oncue.rna.realtime.streaming.models.S3SchemaRegistry$.schemaRegistry(SchemaRegistry.scala:44)
 at
 com.oncue.rna.realtime.streaming.coders.KafkaAvroDecoder.init(KafkaAvroDecoder.scala:20)
 ... 17 more

   The apache httpcomponents libraries include the method above as of
 version 4.2.  The Spark 1.0.2 binaries seem to include version 4.1.

  I can get this to work in my driver program by adding exclusions to
 force use of 4.1, but then I get the error in tasks even when using the
 —jars option of the spark-submit command.  How can I get both the driver
 program and the individual tasks in my spark-streaming job to use the same
 version of this library so my job will run all the way through?

  thanks
 p



Re: Accessing neighboring elements in an RDD

2014-09-03 Thread Victor Tso-Guillen
Interestingly, there was an almost identical question posed on Aug 22 by
cjwang. Here's the link to the archive:
http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-td12621.html#a12664


On Wed, Sep 3, 2014 at 10:33 AM, Daniel, Ronald (ELS-SDG) 
r.dan...@elsevier.com wrote:

 Hi all,

 Assume I have read the lines of a text file into an RDD:

 textFile = sc.textFile(SomeArticle.txt)

 Also assume that the sentence breaks in SomeArticle.txt were done by
 machine and have some errors, such as the break at Fig. in the sample text
 below.

 Index   Text
 N...as shown in Fig.
 N+1 1.
 N+2 The figure shows...

 What I want is an RDD with:

 N   ... as shown in Fig. 1.
 N+1 The figure shows...

 Is there some way a filter() can look at neighboring elements in an RDD?
 That way I could look, in parallel, at neighboring elements in an RDD and
 come up with a new RDD that may have a different number of elements.  Or do
 I just have to sequentially iterate through the RDD?

 Thanks,
 Ron





Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-09-02 Thread Victor Tso-Guillen
I'm pretty sure the issue was an interaction with another subsystem. Thanks
for your patience with me!


On Tue, Sep 2, 2014 at 10:05 AM, Sean Owen so...@cloudera.com wrote:

 +user@

 An executor is specific to an application, but an application can be
 executing many jobs at once. So as I understand many jobs' tasks can
 be executing at once on an executor.

 You may not use your full 80-way parallelism if, for example, your
 data set doesn't have 80 partitions. I also believe Spark will not
 necessarily spread the load over executors, instead preferring to
 respect data and rack locality if possible. Those are two reasons you
 might see only 4 executors active. If you mean only 4 executors exist
 at all, is it possible the other 4 can't provide the memory you're
 asking for?


 On Tue, Sep 2, 2014 at 5:56 PM, Victor Tso-Guillen v...@paxata.com
 wrote:
  Actually one more question, since in preliminary runs I wasn't sure if I
  understood what's going on. Are the cores allocated to an executor able
 to
  execute tasks for different jobs simultaneously, or just for one job at a
  time? I have 10 workers with 8 cores each, and it appeared that one job
 got
  four executors at once, then four more later on. The system wasn't
 anywhere
  near saturation of 80 cores so I would've expected all 8 cores to be
 running
  simultaneously.
 
  If there's value to these questions, please reply back to the list.
 
 
  On Tue, Sep 2, 2014 at 6:58 AM, Victor Tso-Guillen v...@paxata.com
 wrote:
 
  Thank you for the help, guys. So as I expected, I didn't fully
 understand
  the options. I had SPARK_WORKER_CORES set to 1 because I did not realize
  that by setting to  1 it would mean an executor could operate on
 multiple
  tasks simultaneously. I just thought it was a hint to Spark that that
  executor could be expected to use that many threads, but otherwise I
 had not
  understood that it affected the scheduler that way. Thanks!
 
 
  On Sun, Aug 31, 2014 at 9:28 PM, Matei Zaharia matei.zaha...@gmail.com
 
  wrote:
 
 
  Hey Victor,
 
  As Sean said, executors actually execute multiple tasks at a time. The
  only reasons they wouldn't are either (1) if you launched an executor
 with
  just 1 core (you can configure how many cores the executors will use
 when
  you set up your Worker, or it will look at your system by default) or
 (2) if
  your tasks are acquiring some kind of global lock, so only one can run
 at a
  time.
 
  To test this, do the following:
  - Launch your standalone cluster (you can do it on just one machine by
  adding just localhost in the slaves file)
  - Go to http://:4040 and look at the worker list. Do you see workers
 with
  more than 1 core? If not, you need to launch the workers by hand or set
  SPARK_WORKER_CORES in conf/spark-env.sh.
  - Run your application. Make sure it has enough pending tasks for your
  cores in the driver web UI (http://:4040), and if so, jstack one of
 the
  CoarseGrainedExecutor processes on a worker to see what the threads are
  doing. (Look for threads that contain TaskRunner.run in them)
 
  You can also try a simple CPU-bound job that launches lots of tasks
 like
  this to see that all cores are being used:
 
  sc.parallelize(1 to 1000, 1000).map(_ = (1 to
  20).product).count()
 
  Each task here takes 1-2 seconds to execute and there are 1000 of them
 so
  it should fill up your cluster.
 
  Matei
 
 
 
  On August 31, 2014 at 9:18:02 PM, Victor Tso-Guillen
  (v...@paxata.com(mailto:v...@paxata.com)) wrote:
 
   I'm pretty sure my terminology matches that doc except the doc makes
 no
   explicit mention of machines. In standalone mode, you can spawn
 multiple
   workers on a single machine and each will babysit one executor (per
   application). In my observation as well each executor can be
 assigned many
   tasks but operates on one at a time. If there's a way to have it
 execute in
   multiple tasks simultaneously in a single VM can you please show me
 how?
   Maybe I'm missing the requisite configuration options, no matter how
 common
   or trivial...
  
   On Sunday, August 31, 2014, Sean Owen wrote:
The confusion may be your use of 'worker', which isn't matching
 what
'worker' means in Spark. Have a look at
https://spark.apache.org/docs/latest/cluster-overview.html Of
 course
one VM can run many tasks at once; that's already how Spark works.
   
On Sun, Aug 31, 2014 at 4:52 AM, Victor Tso-Guillen wrote:
 I might not be making myself clear, so sorry about that. I
 understand that a
 machine can have as many spark workers as you'd like, for example
 one per
 core. A worker may be assigned to a pool for one or more
 applications, but
 for a single application let's just say a single worker will have
 at most a
 single executor. An executor can be assigned multiple tasks in
 its
 queue,
 but will work on one task at a time only.

 In local mode, you can specify the number

Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
I'm thinking of local mode where multiple virtual executors occupy the same
vm. Can we have the same configuration in spark standalone cluster mode?


Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
Standalone. I'd love to tell it that my one executor can simultaneously
serve, say, 16 tasks at once for an arbitrary number of distinct jobs.


On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Yes, executors run one task per core of your machine by default. You can
 also manually launch them with more worker threads than you have cores.
 What cluster manager are you on?

 Matei

 On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

  I'm thinking of local mode where multiple virtual executors occupy the
 same vm. Can we have the same configuration in spark standalone cluster
 mode?




Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
Any more thoughts on this? I'm not sure how to do this yet.


On Fri, Aug 29, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
wrote:

 Standalone. I'd love to tell it that my one executor can simultaneously
 serve, say, 16 tasks at once for an arbitrary number of distinct jobs.


 On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Yes, executors run one task per core of your machine by default. You can
 also manually launch them with more worker threads than you have cores.
 What cluster manager are you on?

 Matei

 On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

  I'm thinking of local mode where multiple virtual executors occupy the
 same vm. Can we have the same configuration in spark standalone cluster
 mode?





Re: Upgrading 1.0.0 to 1.0.2

2014-08-27 Thread Victor Tso-Guillen
Ah, thanks.


On Tue, Aug 26, 2014 at 7:32 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

 Hi, Victor,

 the issue for you to have different version in driver and cluster is that
 you the master will shutdown your application due to the inconsistent
 SerialVersionID in ExecutorState

 Best,

 --
 Nan Zhu

 On Tuesday, August 26, 2014 at 10:10 PM, Matei Zaharia wrote:

 Things will definitely compile, and apps compiled on 1.0.0 should even be
 able to link against 1.0.2 without recompiling. The only problem is if you
 run your driver with 1.0.0 on its classpath, but the cluster has 1.0.2 in
 executors.

 For Mesos and YARN vs standalone, the difference is that they just have
 more features, at the expense of more complicated setup. For example, they
 have richer support for cross-application sharing (see
 https://spark.apache.org/docs/latest/job-scheduling.html), and the
 ability to run non-Spark applications on the same cluster.

 Matei

 On August 26, 2014 at 6:53:33 PM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

 Yes, we are standalone right now. Do you have literature why one would
 want to consider Mesos or YARN for Spark deployments?

 Sounds like I should try upgrading my project and seeing if everything
 compiles without modification. Then I can connect to an existing 1.0.0
 cluster and see what what happens...

 Thanks, Matei :)


 On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

  Is this a standalone mode cluster? We don't currently make this
 guarantee, though it will likely work in 1.0.0 to 1.0.2. The problem though
 is that the standalone mode grabs the executors' version of Spark code from
 what's installed on the cluster, while your driver might be built against
 another version. On YARN and Mesos, you can more easily mix different
 versions of Spark, since each application ships its own Spark JAR (or
 references one from a URL), and this is used for both the driver and
 executors.

  Matei

 On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

  I wanted to make sure that there's full compatibility between minor
 releases. I have a project that has a dependency on spark-core so that it
 can be a driver program and that I can test locally. However, when
 connecting to a cluster you don't necessarily know what version you're
 connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver
 program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?






Re: What is a Block Manager?

2014-08-27 Thread Victor Tso-Guillen
I have long-lived state I'd like to maintain on the executors that I'd like
to initialize during some bootstrap phase and to update the master when
such executor leaves the cluster.


On Tue, Aug 26, 2014 at 11:18 PM, Liu, Raymond raymond@intel.com
wrote:

 The framework have those info to manage cluster status, and these info
 (e.g. worker number) is also available through spark metrics system.
 While from the user application's point of view, can you give an example
 why you need these info, what would you plan to do with them?

 Best Regards,
 Raymond Liu

 From: Victor Tso-Guillen [mailto:v...@paxata.com]
 Sent: Wednesday, August 27, 2014 1:40 PM
 To: Liu, Raymond
 Cc: user@spark.apache.org
 Subject: Re: What is a Block Manager?

 We're a single-app deployment so we want to launch as many executors as
 the system has workers. We accomplish this by not configuring the max for
 the application. However, is there really no way to inspect what
 machines/executor ids/number of workers/etc is available in context? I'd
 imagine that there'd be something in the SparkContext or in the listener,
 but all I see in the listener is block managers getting added and removed.
 Wouldn't one care about the workers getting added and removed at least as
 much as for block managers?

 On Tue, Aug 26, 2014 at 6:58 PM, Liu, Raymond raymond@intel.com
 wrote:
 Basically, a Block Manager manages the storage for most of the data in
 spark, name a few: block that represent a cached RDD partition,
 intermediate shuffle data, broadcast data etc. it is per executor, while in
 standalone mode, normally, you have one executor per worker.

 You don't control how many worker you have at runtime, but you can somehow
 manage how many executors your application will launch  Check different
 running mode's documentation for details  ( but control where? Hardly, yarn
 mode did some works based on data locality, but this is done by framework
 not user program).

 Best Regards,
 Raymond Liu

 From: Victor Tso-Guillen [mailto:v...@paxata.com]
 Sent: Tuesday, August 26, 2014 11:42 PM
 To: user@spark.apache.org
 Subject: What is a Block Manager?

 I'm curious not only about what they do, but what their relationship is to
 the rest of the system. I find that I get listener events for n block
 managers added where n is also the number of workers I have available to
 the application. Is this a stable constant?

 Also, are there ways to determine at runtime how many workers I have and
 where they are?

 Thanks,
 Victor




What is a Block Manager?

2014-08-26 Thread Victor Tso-Guillen
I'm curious not only about what they do, but what their relationship is to
the rest of the system. I find that I get listener events for n block
managers added where n is also the number of workers I have available to
the application. Is this a stable constant?

Also, are there ways to determine at runtime how many workers I have and
where they are?

Thanks,
Victor


Upgrading 1.0.0 to 1.0.2

2014-08-26 Thread Victor Tso-Guillen
I wanted to make sure that there's full compatibility between minor
releases. I have a project that has a dependency on spark-core so that it
can be a driver program and that I can test locally. However, when
connecting to a cluster you don't necessarily know what version you're
connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver
program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?


Re: Upgrading 1.0.0 to 1.0.2

2014-08-26 Thread Victor Tso-Guillen
Yes, we are standalone right now. Do you have literature why one would want
to consider Mesos or YARN for Spark deployments?

Sounds like I should try upgrading my project and seeing if everything
compiles without modification. Then I can connect to an existing 1.0.0
cluster and see what what happens...

Thanks, Matei :)


On Tue, Aug 26, 2014 at 6:37 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Is this a standalone mode cluster? We don't currently make this guarantee,
 though it will likely work in 1.0.0 to 1.0.2. The problem though is that
 the standalone mode grabs the executors' version of Spark code from what's
 installed on the cluster, while your driver might be built against another
 version. On YARN and Mesos, you can more easily mix different versions of
 Spark, since each application ships its own Spark JAR (or references one
 from a URL), and this is used for both the driver and executors.

 Matei

 On August 26, 2014 at 6:10:57 PM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

  I wanted to make sure that there's full compatibility between minor
 releases. I have a project that has a dependency on spark-core so that it
 can be a driver program and that I can test locally. However, when
 connecting to a cluster you don't necessarily know what version you're
 connecting to. Is a 1.0.0 cluster binary compatible with a 1.0.2 driver
 program? Is a 1.0.0 driver program binary compatible with a 1.0.2 cluster?




Re: What is a Block Manager?

2014-08-26 Thread Victor Tso-Guillen
We're a single-app deployment so we want to launch as many executors as the
system has workers. We accomplish this by not configuring the max for the
application. However, is there really no way to inspect what
machines/executor ids/number of workers/etc is available in context? I'd
imagine that there'd be something in the SparkContext or in the listener,
but all I see in the listener is block managers getting added and removed.
Wouldn't one care about the workers getting added and removed at least as
much as for block managers?


On Tue, Aug 26, 2014 at 6:58 PM, Liu, Raymond raymond@intel.com wrote:

 Basically, a Block Manager manages the storage for most of the data in
 spark, name a few: block that represent a cached RDD partition,
 intermediate shuffle data, broadcast data etc. it is per executor, while in
 standalone mode, normally, you have one executor per worker.

 You don't control how many worker you have at runtime, but you can somehow
 manage how many executors your application will launch  Check different
 running mode's documentation for details  ( but control where? Hardly, yarn
 mode did some works based on data locality, but this is done by framework
 not user program).

 Best Regards,
 Raymond Liu

 From: Victor Tso-Guillen [mailto:v...@paxata.com]
 Sent: Tuesday, August 26, 2014 11:42 PM
 To: user@spark.apache.org
 Subject: What is a Block Manager?

 I'm curious not only about what they do, but what their relationship is to
 the rest of the system. I find that I get listener events for n block
 managers added where n is also the number of workers I have available to
 the application. Is this a stable constant?

 Also, are there ways to determine at runtime how many workers I have and
 where they are?

 Thanks,
 Victor



Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD

2014-08-25 Thread Victor Tso-Guillen
Do you want to do this on one column or all numeric columns?


On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet vineet.hingor...@sap.com
 wrote:

  Hello all,

 Could someone help me with the manipulation of csv file data. I have
 'semicolon' separated csv data including doubles and strings. I want to
 calculate the maximum/average of a column. When I read the file using
 sc.textFile(test.csv).map(_.split(;), each field is read as string. Could
 someone help me with the above manipulation and how to do that.

 Or maybe if there is some way to take the transpose of the data and then
 manipulating the rows in some way?

 Thank you in advance, I am struggling with this thing for quite sometime

 Regards,
 Vineet



Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD

2014-08-25 Thread Victor Tso-Guillen
Assuming the CSV is well-formed (every row has the same number of columns)
and every column is a number, this is how you can do it. You can adjust so
that you pick just the columns you want, of course, by mapping each row to
a new Array that contains just the column values you want. Just be sure the
logic selects the same columns for every row or your stats might look funny.

val rdd: RDD[Array[Double]] = ???
rdd.mapPartitions(vs = {
  Iterator(vs.toArray.transpose.map(StatCounter(_)))
}).reduce((as, bs) = as.zipWithIndex.map {
  case (a, i) = a.merge(bs(i))
})



On Mon, Aug 25, 2014 at 9:50 AM, Hingorani, Vineet vineet.hingor...@sap.com
 wrote:

  Hello Victor,



 I want to do it on multiple columns. I was able to do it on one column by
 the help of Sean using code below.



 val matData = file.map(_.split(;))

 val stats = matData.map(_(2).toDouble).stats()

 stats.mean

 stats.max



 Thank you



 Vineet



 *From:* Victor Tso-Guillen [mailto:v...@paxata.com]
 *Sent:* Montag, 25. August 2014 18:34
 *To:* Hingorani, Vineet
 *Cc:* user@spark.apache.org
 *Subject:* Re: Manipulating columns in CSV file or Transpose of
 Array[Array[String]] RDD



 Do you want to do this on one column or all numeric columns?



 On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet 
 vineet.hingor...@sap.com wrote:

 Hello all,

 Could someone help me with the manipulation of csv file data. I have
 'semicolon' separated csv data including doubles and strings. I want to
 calculate the maximum/average of a column. When I read the file using
 sc.textFile(test.csv).map(_.split(;), each field is read as string. Could
 someone help me with the above manipulation and how to do that.

 Or maybe if there is some way to take the transpose of the data and then
 manipulating the rows in some way?

 Thank you in advance, I am struggling with this thing for quite sometime

 Regards,
 Vineet





Re: Finding previous and next element in a sorted RDD

2014-08-23 Thread Victor Tso-Guillen
Using mapPartitions, you could get the neighbors within a partition, but if
you think about it, it's much more difficult to accomplish this for the
complete dataset.


On Fri, Aug 22, 2014 at 11:24 AM, cjwang c...@cjwang.us wrote:

 It would be nice if an RDD that was massaged by OrderedRDDFunction could
 know
 its neighbors.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Finding-previous-and-next-element-in-a-sorted-RDD-tp12621p12664.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: FetchFailed when collect at YARN cluster

2014-08-23 Thread Victor Tso-Guillen
I think I emailed about a similar issue, but in standalone mode. I haven't
investigated much so I don't know what's a good fix.


On Fri, Aug 22, 2014 at 12:00 PM, Jiayu Zhou dearji...@gmail.com wrote:

 Hi,

 I am having this FetchFailed issue when the driver is about to collect
 about
 2.5M lines of short strings (about 10 characters each line) from a YARN
 cluster with 400 nodes:

 *14/08/22 11:43:27 WARN scheduler.TaskSetManager: Lost task 205.0 in stage
 0.0 (TID 1228, aaa.xxx.com): FetchFailed(BlockManagerId(220, aaa.xxx.com,
 37899, 0), shuffleId=0, mapId=420, reduceId=205)
 14/08/22 11:43:27 WARN scheduler.TaskSetManager: Lost task 603.0 in stage
 0.0 (TID 1626, aaa.xxx.com): FetchFailed(BlockManagerId(220, aaa.xxx.com,
 37899, 0), shuffleId=0, mapId=420, reduceId=603)*

 And other than this FetchFailed, I am not able to see anything else from
 the
 log file (no OOM errors shown).

 This does not happen when there is only 2M lines. I guess it might because
 of the akka message size, and then I used the following

 spark.akka.frameSize  100
 spark.akka.timeout  200

 And that does not help as well. Has anyone experienced similar problems?

 Thanks,
 Jiayu



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailed-when-collect-at-YARN-cluster-tp12670.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: FetchFailed when collect at YARN cluster

2014-08-23 Thread Victor Tso-Guillen
I did not try the Akka configs. I was doing a shuffle operation, I believe
a sort, but two copies of the operation at the same time. It was a 20M row
dataset of reasonable horizontal size.


On Sat, Aug 23, 2014 at 2:23 PM, Jiayu Zhou dearji...@gmail.com wrote:

 I saw your post. What are the operations you did? Are you trying to collect
 data from driver? Did you try the akka configurations?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/FetchFailed-when-collect-at-YARN-cluster-tp12670p12703.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




FetchFailedException from Block Manager

2014-08-22 Thread Victor Tso-Guillen
Anyone know why I would see this in a bunch of executor logs? Is it just
classical overloading of the cluster network, OOM, or something else? If
anyone's seen this before, what do I need to tune to make some headway here?

Thanks,
Victor


Caused by: org.apache.spark.FetchFailedException: Fetch failed:
BlockManagerId(116, xxx, 54761, 0) 110 32 38

at org.apache.spark.BlockStoreShuffleFetcher.org
$apache$spark$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)

at
org.apache.spark.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:77)

at
org.apache.spark.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:77)

at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)

at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)

at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

at
org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:107)

at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$4.apply(PairRDDFunctions.scala:106)

at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)

at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)


Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-20 Thread Victor Tso-Guillen
How about this:

val prev: RDD[V] = rdd.mapPartitions(partition = { /*setup()*/; partition
})
new RDD[V](prev) {
  protected def getPartitions = prev.partitions

  def compute(split: Partition, context: TaskContext) = {
context.addOnCompleteCallback(() = /*cleanup()*/)
firstParent[V].iterator(split, context)
  }
}


On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen so...@cloudera.com wrote:

 I think you're looking for foreachPartition(). You've kinda hacked it
 out of mapPartitions(). Your case has a simple solution, yes. After
 saving to the DB, you know you can close the connection, since you
 know the use of the connection has definitely just finished. But it's
 not a simpler solution for mapPartitions() since that's not really
 what you are using :)

 In general, mapPartitions creates an Iterator from another Iterator.
 Of course you could consume the input iterator, open the connection,
 perform operations, close the connection and return an iterator over
 the result. That works, but requires reading the entire input no
 matter what, and, reading it into memory. These may not be OK in all
 cases.

 Where possible, it's nicest to return an Iterator that accesses the
 source Iterator only as needed to produce elements. This means
 returning that Iterator before any work has been done. So you have to
 close the connection later when the Iterator has been exhausted.
 Really Tobias's method is trying to shim in a cleanup() lifecycle
 method into the Iterator. I suppose it could be done a little more
 cleanly using Guava's Iterator library, which would give you a more
 explicit way to execute something when done.


 On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:
  Sean, would this work --
 
  rdd.mapPartitions { partition = Iterator(partition) }.foreach(
 
 // Some setup code here
 // save partition to DB
 // Some cleanup code here
  )
 
 
  I tried a pretty simple example ... I can see that the setup and cleanup
 are
  executed on the executor node, once per partition (I used
  mapPartitionWithIndex instead of mapPartition to track this a little
  better). Seems like an easier solution than Tobias's but I'm wondering if
  it's perhaps incorrect

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: a noob question for how to implement setup and cleanup in Spark map

2014-08-20 Thread Victor Tso-Guillen
And duh, of course, you can do the setup in that new RDD as well :)


On Wed, Aug 20, 2014 at 1:59 AM, Victor Tso-Guillen v...@paxata.com wrote:

 How about this:

 val prev: RDD[V] = rdd.mapPartitions(partition = { /*setup()*/; partition
 })
 new RDD[V](prev) {
   protected def getPartitions = prev.partitions

   def compute(split: Partition, context: TaskContext) = {
 context.addOnCompleteCallback(() = /*cleanup()*/)
 firstParent[V].iterator(split, context)
   }
 }


 On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen so...@cloudera.com wrote:

 I think you're looking for foreachPartition(). You've kinda hacked it
 out of mapPartitions(). Your case has a simple solution, yes. After
 saving to the DB, you know you can close the connection, since you
 know the use of the connection has definitely just finished. But it's
 not a simpler solution for mapPartitions() since that's not really
 what you are using :)

 In general, mapPartitions creates an Iterator from another Iterator.
 Of course you could consume the input iterator, open the connection,
 perform operations, close the connection and return an iterator over
 the result. That works, but requires reading the entire input no
 matter what, and, reading it into memory. These may not be OK in all
 cases.

 Where possible, it's nicest to return an Iterator that accesses the
 source Iterator only as needed to produce elements. This means
 returning that Iterator before any work has been done. So you have to
 close the connection later when the Iterator has been exhausted.
 Really Tobias's method is trying to shim in a cleanup() lifecycle
 method into the Iterator. I suppose it could be done a little more
 cleanly using Guava's Iterator library, which would give you a more
 explicit way to execute something when done.


 On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:
  Sean, would this work --
 
  rdd.mapPartitions { partition = Iterator(partition) }.foreach(
 
 // Some setup code here
 // save partition to DB
 // Some cleanup code here
  )
 
 
  I tried a pretty simple example ... I can see that the setup and
 cleanup are
  executed on the executor node, once per partition (I used
  mapPartitionWithIndex instead of mapPartition to track this a little
  better). Seems like an easier solution than Tobias's but I'm wondering
 if
  it's perhaps incorrect

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org