Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay





   

Re: small error in the docs?

2015-01-15 Thread Sean Owen
Yes that's a typo. The API docs and source code are correct though.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

That and your IDE should show the correct signature. You can open a PR
to fix the typo in
https://spark.apache.org/docs/latest/programming-guide.html

On Thu, Jan 15, 2015 at 4:43 PM, kirillfish k.rybac...@datacentric.ru wrote:
 cogroup() function seems to return (K, (IterableV, IterableW)), rather
 than (K, IterableV, IterableW), as it is pointed out in the docs (at
 least for version 1.1.0):

 https://spark.apache.org/docs/1.1.0/programming-guide.html

 This simple discrepancy costed to me half a day of debug and frustration.
 Kirill

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Nitin kak
Replying to all

Is this Overhead memory allocation used for any specific purpose.

For example, will it be any different if I do *--executor-memory 22G *with
overhead set to 0%(hypothetically) vs
*--executor-memory 20G* and overhead memory to default(9%) which
eventually brings the total memory asked by Spark to approximately 22G.

On Thu, Jan 15, 2015 at 12:54 PM, Nitin kak nitinkak...@gmail.com wrote:

 Is this Overhead memory allocation used for any specific purpose.

 For example, will it be any different if I do *--executor-memory 22G *with
 overhead set to 0%(hypothetically) vs
 *--executor-memory 20G* and overhead memory to default(9%) which
 eventually brings the total memory asked by Spark to approximately 22G.



 On Thu, Jan 15, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote:

 This is a YARN setting. It just controls how much any container can
 reserve, including Spark executors. That is not the problem.

 You need Spark to ask for more memory from YARN, on top of the memory
 that is requested by --executor-memory. Your output indicates the default
 of 7% is too little. For example you can ask for 20GB for executors and ask
 for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN
 needs to be set to allow containers of at least 22GB!)

 On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote:

 Thanks for sticking to this thread.

 I am guessing what memory my app requests and what Yarn requests on my
 part should be same and is determined by the value of
 *--executor-memory* which I had set to *20G*. Or can the two values be
 different?

 I checked in Yarn configurations(below), so I think that fits well into
 the memory overhead limits.


 Container Memory Maximum
 yarn.scheduler.maximum-allocation-mb
  MiBGiB
 Reset to the default value: 64 GiB
 http://10.1.1.49:7180/cmf/services/108/config#
 Override Instances
 http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

 The largest amount of physical memory, in MiB, that can be requested for
 a container.





 On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com
 wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize
 and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the
 same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception
 was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
 
 

Is spark suitable for large scale pagerank, such as 200 million nodes, 2 billion edges?

2015-01-15 Thread txw
Hi,


I am run PageRank on a large dataset, which include 200 million nodes and 2 
billion edges?
Isspark suitable for large scale pagerank? How many cores and MEM do I need and 
how long will it take?


Thanks


Xuewei Tang

Re: Testing if an RDD is empty?

2015-01-15 Thread Al M
You can also check rdd.partitions.size.  This will be 0 for empty RDDs and 
0 for RDDs with data.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Testing-if-an-RDD-is-empty-tp1678p21170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark crashes on second or third call first() on file

2015-01-15 Thread Davies Liu
What's the version of Spark you are using?

On Wed, Jan 14, 2015 at 12:00 AM, Linda Terlouw linda.terl...@icris.nl wrote:
 I'm new to Spark. When I use the Movie Lens dataset 100k
 (http://grouplens.org/datasets/movielens/), Spark crashes when I run the
 following code. The first call to movieData.first() gives the correct
 result. Depending on which machine I use, it crashed the second or third
 time. Does anybody know why? When I use scala in the Spark shell, this does
 not happen. Scala gives the correct result every time.


 import sys

 sys.path.append(d:/spark/python)

 sys.path.append(d:/spark/python/lib/py4j-0.8.2.1-src.zip)

 from pyspark import SparkContext

 import numpy

 import os


 os.environ[SPARK_HOME] = d:/spark

 sc = SparkContext(appName=testapp)


 movieData = sc.textFile(d:/moviedata/u.data)

 movieData.first()

 movieData.first()

 movieData.first()



 Py4JJavaError: An error occurred while calling
 z:org.apache.spark.api.python.PythonRDD.runJob.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0
 (TID 2, localhost): java.net.SocketException: Connection reset by peer:
 socket write error
 at java.net.SocketOutputStream.socketWrite0(Native Method)
 at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
 at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
 at
 java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
 at java.io.DataOutputStream.write(DataOutputStream.java:107)
 at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
 at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:592)
 at
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:389)
 at
 org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:388)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:388)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:242)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:204)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1460)
 at
 org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:203)

 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
 at akka.dispatch.Mailbox.run(Mailbox.scala:220)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Sean Owen
If you give the executor 22GB, it will run with ... -Xmx22g. If the JVM
heap gets nearly full, it will almost certainly consume more than 22GB of
physical memory, because the JVM needs memory for more than just heap. But
in this scenario YARN was only asked for 22GB and it gets killed. This is
exactly what the overhead setting is solving.

The default is 7% not 9%, or 1.4GB for a 20GB executor heap, so a 2GB
overhead is a bump up. It may or may not be sufficient; I just guessed. Any
JVM program with X heap is going to potentially use more than X physical
memory. The overhead setting attempts to account for that so that you
aren't bothered setting both values. But sometimes you need to manually
increase the overhead cushion if you see that YARN kills your program for
using too much physical memory. That's not the same as the JVM running out
of heap.

On Thu, Jan 15, 2015 at 5:54 PM, Nitin kak nitinkak...@gmail.com wrote:

 Is this Overhead memory allocation used for any specific purpose.

 For example, will it be any different if I do *--executor-memory 22G *with
 overhead set to 0%(hypothetically) vs
 *--executor-memory 20G* and overhead memory to default(9%) which
 eventually brings the total memory asked by Spark to approximately 22G.



 On Thu, Jan 15, 2015 at 12:10 PM, Sean Owen so...@cloudera.com wrote:

 This is a YARN setting. It just controls how much any container can
 reserve, including Spark executors. That is not the problem.

 You need Spark to ask for more memory from YARN, on top of the memory
 that is requested by --executor-memory. Your output indicates the default
 of 7% is too little. For example you can ask for 20GB for executors and ask
 for 2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN
 needs to be set to allow containers of at least 22GB!)

 On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote:

 Thanks for sticking to this thread.

 I am guessing what memory my app requests and what Yarn requests on my
 part should be same and is determined by the value of
 *--executor-memory* which I had set to *20G*. Or can the two values be
 different?

 I checked in Yarn configurations(below), so I think that fits well into
 the memory overhead limits.


 Container Memory Maximum
 yarn.scheduler.maximum-allocation-mb
  MiBGiB
 Reset to the default value: 64 GiB
 http://10.1.1.49:7180/cmf/services/108/config#
 Override Instances
 http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

 The largest amount of physical memory, in MiB, that can be requested for
 a container.





 On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com
 wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize
 and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the
 same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception
 was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 

Re: Some tasks are taking long time

2015-01-15 Thread Nicos
Ajay,
Unless we are dealing with some synchronization/conditional variable 
bug in Spark, try this per tuning guide:
Cache Size Tuning

One important configuration parameter for GC is the amount of memory that 
should be used for caching RDDs. By default, Spark uses 60% of the configured 
executor memory (spark.executor.memory) to cache RDDs. This means that 40% of 
memory is available for any objects created during task execution.

In case your tasks slow down and you find that your JVM is garbage-collecting 
frequently or running out of memory, lowering this value will help reduce the 
memory consumption. To change this to, say, 50%, you can call 
conf.set(spark.storage.memoryFraction, 0.5) on your SparkConf. Combined 
with the use of serialized caching, using a smaller cache should be sufficient 
to mitigate most of the garbage collection problems. In case you are interested 
in further tuning the Java GC, continue reading below.


Complete list of tips here:
https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage 
https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage

Cheers,
- Nicos

 On Jan 15, 2015, at 6:49 AM, Ajay Srivastava 
 a_k_srivast...@yahoo.com.INVALID wrote:
 
 Thanks RK. I can turn on speculative execution but I am trying to find out 
 actual reason for delay as it happens on any node. Any idea about the stack 
 trace in my previous mail.
 
 Regards,
 Ajay
 
 
 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
 
 
 If you don't want a few slow tasks to slow down the entire job, you can turn 
 on speculation. 
 
 Here are the speculation settings from Spark Configuration - Spark 1.2.0 
 Documentation http://spark.apache.org/docs/1.2.0/configuration.html.
  
  
  
  
  
  
 Spark Configuration - Spark 1.2.0 Documentation
  http://spark.apache.org/docs/1.2.0/configuration.htmlSpark Configuration 
 Spark Properties Dynamically Loading Spark Properties Viewing Spark 
 Properties Available Properties Application Properties Runtime Environment 
 Shuffle Behavior Spark UI
 View on spark.apache.org 
 http://spark.apache.org/docs/1.2.0/configuration.html  
 Preview by Yahoo
  
 
 spark.speculation false   If set to true, performs speculative 
 execution of tasks. This means if one or more tasks are running slowly in a 
 stage, they will be re-launched.
 spark.speculation.interval100 How often Spark will check for tasks to 
 speculate, in milliseconds.
 spark.speculation.quantile0.75Percentage of tasks which must be 
 complete before speculation is enabled for a particular stage.
 spark.speculation.multiplier  1.5 
 How many times slower a task is than the median to be considered for 
 speculation.
 
  
 
 
 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
 a_k_srivast...@yahoo.com.INVALID wrote:
 
 
 Hi,
 
 My spark job is taking long time. I see that some tasks are taking longer 
 time for same amount of data and shuffle read/write. What could be the 
 possible reasons for it ?
 
 The thread-dump sometimes show that all the tasks in an executor are waiting 
 with following stack trace -
 
 Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
 nid=0x3f85 waiting on condition [0x7fcce3ddc000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7fd0aee82e00 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(Unknown Source)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
  Source)
 at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
 at 
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
 at 
 org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at 
 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
 at 
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 
 org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
 at 
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
 at 
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at 
 

using hiveContext to select a nested Map-data-type from an AVROmodel+parquet file

2015-01-15 Thread BB
Hi all,
  Any help on the following is very much appreciated.
=
Problem:
  On a schemaRDD read from a parquet file (data within file uses AVRO model)
using the HiveContext:
 I can't figure out how to 'select' or use 'where' clause, to filter
rows on a field that has a Map AVRO-data-type. I want to do a filtering
using a given ('key' : 'value'). How could I do this?

Details:
* the printSchema of the loaded schemaRDD is like so:

-- output snippet -
|-- created: long (nullable = false)
|-- audiences: map (nullable = true)
||-- key: string
||-- value: struct (valueContainsNull = false)
|||-- percent: float (nullable = false)
|||-- cluster: integer (nullable = false)
- 

* I dont get a result when I try to select on a specific value of the
'audience' like so:
 
  SELECT created, audiences FROM mytable_data LATERAL VIEW
explode(audiences) adtab AS adcol WHERE audiences['key']=='tg_loh' LIMIT 10

 sequence of commands on the spark-shell (a different query and output) is:

-- code snippet -
scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala val parquetFile2 =
hiveContext.parquetFile(/home/myuser/myparquetfile)
scala parquetFile2.registerTempTable(mytable_data)
scala hiveContext.cacheTable(mytable_data)

scala hiveContext.sql(SELECT  audiences['key'], audiences['value'] FROM
mytable_data LATERAL VIEW explode(audiences) adu AS audien LIMIT
3).collect().foreach(println)

-- output -
[null,null]
[null,null]
[null,null]


gives a list of nulls. I can see that there is data when I just do the
following (output is truncated):

-- code snippet -
scala hiveContext.sql(SELECT audiences FROM mytable_data LATERAL VIEW
explode(audiences) tablealias AS colalias LIMIT
1).collect().foreach(println)

 output --
[Map(tg_loh - [0.0,1,Map()], tg_co - [0.0,1,Map(tg_co_petrol - 0.0)],
tg_wall - [0.0,1,Map(tg_wall_poi - 0.0)],  ...


Q1) What am I doing wrong?
Q2) How can I use 'where' in the query to filter on specific values?

What works:
   Queries with filtering, and selecting on fields that have simple AVRO
data-types, such as long or string works fine.

===

 I hope the explanation makes sense. Thanks.
Best,
BB



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-hiveContext-to-select-a-nested-Map-data-type-from-an-AVROmodel-parquet-file-tp21168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dockerized spark executor on mesos?

2015-01-15 Thread Nicholas Chammas
The AMPLab maintains a bunch of Docker files for Spark here:
https://github.com/amplab/docker-scripts

Hasn't been updated since 1.0.0, but might be a good starting point.

On Wed Jan 14 2015 at 12:14:13 PM Josh J joshjd...@gmail.com wrote:

 We have dockerized Spark Master and worker(s) separately and are using it
 in
 our dev environment.


 Is this setup available on github or dockerhub?

 On Tue, Dec 9, 2014 at 3:50 PM, Venkat Subramanian vsubr...@gmail.com
 wrote:

 We have dockerized Spark Master and worker(s) separately and are using it
 in
 our dev environment. We don't use Mesos though, running it in Standalone
 mode, but adding Mesos should not be that difficult I think.

 Regards

 Venkat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dockerized-spark-executor-on-mesos-tp20276p20603.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Sean Owen
This is a YARN setting. It just controls how much any container can
reserve, including Spark executors. That is not the problem.

You need Spark to ask for more memory from YARN, on top of the memory that
is requested by --executor-memory. Your output indicates the default of 7%
is too little. For example you can ask for 20GB for executors and ask for
2GB of overhead. Spark will ask for 22GB from YARN. (Of course, YARN needs
to be set to allow containers of at least 22GB!)

On Thu, Jan 15, 2015 at 4:31 PM, Nitin kak nitinkak...@gmail.com wrote:

 Thanks for sticking to this thread.

 I am guessing what memory my app requests and what Yarn requests on my
 part should be same and is determined by the value of *--executor-memory*
 which I had set to *20G*. Or can the two values be different?

 I checked in Yarn configurations(below), so I think that fits well into
 the memory overhead limits.


 Container Memory Maximum
 yarn.scheduler.maximum-allocation-mb
  MiBGiB
 Reset to the default value: 64 GiB
 http://10.1.1.49:7180/cmf/services/108/config#
 Override Instances
 http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

 The largest amount of physical memory, in MiB, that can be requested for a
 container.





 On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
 
 

Re: SQL JSON array operations

2015-01-15 Thread Ayoub Benali
You could try yo use hive context which bring HiveQL, it would allow you to
query nested structures using LATERAL VIEW explode...
On Jan 15, 2015 4:03 PM, jvuillermet jeremy.vuiller...@gmail.com wrote:

 let's say my json file lines looks like this

 {user: baz, tags : [foo, bar] }
 

 sqlContext.jsonFile(data.json)
 ...
 How could I query for user with bar tags using SQL

 sqlContext.sql(select user from users where tags ?contains? 'bar' )

 I could simplify the request and use the returned RDD to filter on tags but
 I'm exploring an app where users can write their SQL queries




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: dockerized spark executor on mesos?

2015-01-15 Thread Tim Chen
Just throwing this out here, there is existing PR to add docker support for
spark framework to launch executors with docker image.

https://github.com/apache/spark/pull/3074

Hopefully this will be merged sometime.

Tim

On Thu, Jan 15, 2015 at 9:18 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 The AMPLab maintains a bunch of Docker files for Spark here:
 https://github.com/amplab/docker-scripts

 Hasn't been updated since 1.0.0, but might be a good starting point.

 On Wed Jan 14 2015 at 12:14:13 PM Josh J joshjd...@gmail.com wrote:

 We have dockerized Spark Master and worker(s) separately and are using it
 in
 our dev environment.


 Is this setup available on github or dockerhub?

 On Tue, Dec 9, 2014 at 3:50 PM, Venkat Subramanian vsubr...@gmail.com
 wrote:

 We have dockerized Spark Master and worker(s) separately and are using
 it in
 our dev environment. We don't use Mesos though, running it in Standalone
 mode, but adding Mesos should not be that difficult I think.

 Regards

 Venkat



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/dockerized-spark-executor-on-mesos-tp20276p20603.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Is spark suitable for large scale pagerank, such as 200 million nodes, 2 billion edges?

2015-01-15 Thread Ted Yu
Have you seen http://search-hadoop.com/m/JW1q5pE3P12 ?

Please also take a look at the end-to-end performance graph on
http://spark.apache.org/graphx/

Cheers

On Thu, Jan 15, 2015 at 9:29 AM, txw t...@outlook.com wrote:

 Hi,


 I am run PageRank on a large dataset, which include 200 million nodes and
 2 billion edges?

 Is spark suitable for large scale pagerank? How many cores and MEM do I
 need and how long will it take?


 Thanks


 Xuewei Tang



SQL JSON array operations

2015-01-15 Thread jvuillermet
let's say my json file lines looks like this

{user: baz, tags : [foo, bar] }


sqlContext.jsonFile(data.json)
...
How could I query for user with bar tags using SQL

sqlContext.sql(select user from users where tags ?contains? 'bar' )

I could simplify the request and use the returned RDD to filter on tags but
I'm exploring an app where users can write their SQL queries




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Inserting an element in RDD[String]

2015-01-15 Thread Hafiz Mujadid
thanks

On Thu, Jan 15, 2015 at 7:35 PM, Prannoy [via Apache Spark User List] 
ml-node+s1001560n21163...@n3.nabble.com wrote:

 Hi,

 You can take the schema line in another rdd and than do a union of the two
 rdd .

 ListString schemaList = new ArrayListString;
 schemaList.add(xyz);

 // where xyz is your schema line

 JavaRDD schemaRDDString = sc.parallize(schemaList) ;

 //where sc is your sparkcontext

  JavaRDD newRDDString = schemaRDD.union(yourRDD);

 // where yourRDD is your another rdd starting of which you want to add the
 schema line.

 The code is in java, you can change it to scala 

 Thanks.





 On Thu, Jan 15, 2015 at 7:46 PM, Hafiz Mujadid [via Apache Spark User
 List] [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21163i=0 wrote:

 hi experts!

 I hav an RDD[String] and i want to add schema line at beginning in this
 rdd. I know RDD is immutable. So is there anyway to have a new rdd with one
 schema line and contents of previous rdd?


 Thanks

 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http:///user/SendEmail.jtp?type=nodenode=21163i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161p21163.html
  To unsubscribe from Inserting an element in RDD[String], click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21161code=aGFmaXptdWphZGlkMDBAZ21haWwuY29tfDIxMTYxfC05MjEzOTMxMTE=
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




-- 
Regards: HAFIZ MUJADID




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161p21165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Sean Owen
Those settings aren't relevant, I think. You're concerned with what
your app requests, and what Spark requests of YARN on your behalf. (Of
course, you can't request more than what your cluster allows for a
YARN container for example, but that doesn't seem to be what is
happening here.)

You do not want to omit --executor-memory if you need large executor
memory heaps, since then you just request the default and that is
evidently not enough memory for your app.

Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
about 1.4G. You might set this higher to 2G to give more overhead.

See the --config property=value syntax documented in
http://spark.apache.org/docs/latest/submitting-applications.html

On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote:
 Thanks Sean.

 I guess Cloudera Manager has parameters executor_total_max_heapsize and
 worker_max_heapsize which point to the parameters you mentioned above.

 How much should that cushon between the jvm heap size and yarn memory limit
 be?

 I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
 error as above.

 Then, I removed the --executor-memory clause

 spark-submit --class ConnectedComponentsTest --master yarn-cluster
 --num-executors 7 --executor-cores 1
 target/scala-2.10/connectedcomponentstest_2.10-1.0.jar

 That is not giving GC, Out of memory exception

 15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was
 thrown by a user handler while handling an exception event ([id: 0x362d65d4,
 /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION: java.lang.OutOfMemoryError:
 GC overhead limit exceeded)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
   at java.lang.Object.clone(Native Method)
   at akka.util.CompactByteString$.apply(ByteString.scala:410)
   at akka.util.ByteString$.apply(ByteString.scala:22)
   at
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
   at
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
   at
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
   at
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
   at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
   at
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
   at
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
   at
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
   at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
   at 
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
   at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
   at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
   at
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
   at
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
   at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
   at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread
 SparkListenerBus
 java.lang.OutOfMemoryError: GC overhead limit exceeded
   at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
   at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
   at scala.collection.immutable.List.foreach(List.scala:318)
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
   at org.json4s.JsonDSL$class.seq2jvalue(JsonDSL.scala:68)
   at org.json4s.JsonDSL$.seq2jvalue(JsonDSL.scala:61)
   at
 org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127)
   at
 org.apache.spark.util.JsonProtocol$$anonfun$jobStartToJson$3.apply(JsonProtocol.scala:127)
   at org.json4s.JsonDSL$class.pair2jvalue(JsonDSL.scala:79)
   at org.json4s.JsonDSL$.pair2jvalue(JsonDSL.scala:61)
   at
 org.apache.spark.util.JsonProtocol$.jobStartToJson(JsonProtocol.scala:127)
   at
 org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:59)
   at
 org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:92)
   at
 

Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Nitin kak
I am sorry for the formatting error, the value for
*yarn.scheduler.maximum-allocation-mb
= 28G*

On Thu, Jan 15, 2015 at 11:31 AM, Nitin kak nitinkak...@gmail.com wrote:

 Thanks for sticking to this thread.

 I am guessing what memory my app requests and what Yarn requests on my
 part should be same and is determined by the value of *--executor-memory*
 which I had set to *20G*. Or can the two values be different?

 I checked in Yarn configurations(below), so I think that fits well into
 the memory overhead limits.


 Container Memory Maximum
 yarn.scheduler.maximum-allocation-mb
  MiBGiB
 Reset to the default value: 64 GiB
 http://10.1.1.49:7180/cmf/services/108/config#
 Override Instances
 http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

 The largest amount of physical memory, in MiB, that can be requested for a
 container.





 On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
  15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread
  SparkListenerBus
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at
 

Re: Running beyond memory limits in ConnectedComponents

2015-01-15 Thread Nitin kak
Thanks for sticking to this thread.

I am guessing what memory my app requests and what Yarn requests on my part
should be same and is determined by the value of *--executor-memory* which
I had set to *20G*. Or can the two values be different?

I checked in Yarn configurations(below), so I think that fits well into the
memory overhead limits.


Container Memory Maximum
yarn.scheduler.maximum-allocation-mb
 MiBGiB
Reset to the default value: 64 GiB
http://10.1.1.49:7180/cmf/services/108/config#
Override Instances
http://10.1.1.49:7180/cmf/service/108/roleType/RESOURCEMANAGER/group/yarn-RESOURCEMANAGER-BASE/config/yarn_scheduler_maximum_allocation_mb?wizardMode=falsereturnUrl=%2Fcmf%2Fservices%2F108%2FconfigfilterValue=

The largest amount of physical memory, in MiB, that can be requested for a
container.





On Thu, Jan 15, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Those settings aren't relevant, I think. You're concerned with what
 your app requests, and what Spark requests of YARN on your behalf. (Of
 course, you can't request more than what your cluster allows for a
 YARN container for example, but that doesn't seem to be what is
 happening here.)

 You do not want to omit --executor-memory if you need large executor
 memory heaps, since then you just request the default and that is
 evidently not enough memory for your app.

 Look at http://spark.apache.org/docs/latest/running-on-yarn.html and
 spark.yarn.executor.memoryOverhead  By default it's 7% of your 20G or
 about 1.4G. You might set this higher to 2G to give more overhead.

 See the --config property=value syntax documented in
 http://spark.apache.org/docs/latest/submitting-applications.html

 On Thu, Jan 15, 2015 at 3:47 AM, Nitin kak nitinkak...@gmail.com wrote:
  Thanks Sean.
 
  I guess Cloudera Manager has parameters executor_total_max_heapsize and
  worker_max_heapsize which point to the parameters you mentioned above.
 
  How much should that cushon between the jvm heap size and yarn memory
 limit
  be?
 
  I tried setting jvm memory to 20g and yarn to 24g, but it gave the same
  error as above.
 
  Then, I removed the --executor-memory clause
 
  spark-submit --class ConnectedComponentsTest --master yarn-cluster
  --num-executors 7 --executor-cores 1
  target/scala-2.10/connectedcomponentstest_2.10-1.0.jar
 
  That is not giving GC, Out of memory exception
 
  15/01/14 21:20:33 WARN channel.DefaultChannelPipeline: An exception was
  thrown by a user handler while handling an exception event ([id:
 0x362d65d4,
  /10.1.1.33:35463 = /10.1.1.73:43389] EXCEPTION:
 java.lang.OutOfMemoryError:
  GC overhead limit exceeded)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.Object.clone(Native Method)
at akka.util.CompactByteString$.apply(ByteString.scala:410)
at akka.util.ByteString$.apply(ByteString.scala:22)
at
 
 akka.remote.transport.netty.TcpHandlers$class.onMessage(TcpSupport.scala:45)
at
 
 akka.remote.transport.netty.TcpServerHandler.onMessage(TcpSupport.scala:57)
at
 
 akka.remote.transport.netty.NettyServerHelpers$class.messageReceived(NettyHelpers.scala:43)
at
 
 akka.remote.transport.netty.ServerHandler.messageReceived(NettyTransport.scala:179)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at
 
 org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at
 org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at
 org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
at
 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at
 org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
  15/01/14 21:20:33 ERROR util.Utils: Uncaught exception in thread
  SparkListenerBus
  java.lang.OutOfMemoryError: GC overhead limit exceeded
at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
 
 

Re: save spark streaming output to single file on hdfs

2015-01-15 Thread Prannoy
Hi,

You can use FileUtil.copyMerge API and specify the path to the folder where
saveAsTextFile is save the part text file.

Suppose your directory is /a/b/c/

use FileUtil.copyMerge(FileSystem of source, a/b/c, FileSystem of
destination, Path to the merged file say (a/b/c.txt), true(to delete the
original dir,null))

Thanks.

On Tue, Jan 13, 2015 at 11:34 PM, jamborta [via Apache Spark User List] 
ml-node+s1001560n21124...@n3.nabble.com wrote:

 Hi all,

 Is there a way to save dstream RDDs to a single file so that another
 process can pick it up as a single RDD?
 It seems that each slice is saved to a separate folder, using
 saveAsTextFiles method.

 I'm using spark 1.2 with pyspark

 thanks,





 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-tp21124.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-tp21124p21167.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

small error in the docs?

2015-01-15 Thread kirillfish
cogroup() function seems to return (K, (IterableV, IterableW)), rather than 
(K, IterableV, IterableW), as it is pointed out in the docs (at least for 
version 1.1.0):

https://spark.apache.org/docs/1.1.0/programming-guide.html 
https://spark.apache.org/docs/1.1.0/programming-guide.html

This simple discrepancy costed to me half a day of debug and frustration.
Kirill

Re: Error when running SparkPi on Secure HA Hadoop cluster

2015-01-15 Thread Marcelo Vanzin
You're specifying the queue in the spark-submit command line:

  --queue thequeue

Are you sure that queue exists?


On Thu, Jan 15, 2015 at 11:23 AM, Manoj Samel manojsamelt...@gmail.com wrote:
 Hi,

 Setup is as follows

 Hadoop Cluster 2.3.0 (CDH5.0)
 - Namenode HA
 - Resource manager HA
 - Secured with Kerberos

 Spark 1.2

 Run SparkPi as follows
 - conf/spark-defaults.conf has following entries
 spark.yarn.queue myqueue
 spark.yarn.access.namenodes hdfs://namespace (remember this is namenode HA)
 - Do kinit with some user keytab
 - submit SparkPi as follows
 spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client
 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1
 --queue thequeue $MY_SPARK_DIR/lib/spark-examples*.jar 10

 Gives following trace (not sure why it shows unknown queue when queue name
 is specified in the spark-defaults.conf above.

 15/01/15 19:18:27 INFO impl.YarnClientImpl: Submitted application
 application_1415648563285_31469
 15/01/15 19:18:28 INFO yarn.Client: Application report for
 application_1415648563285_31469 (state: FAILED)
 15/01/15 19:18:28 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1415648563285_31469 submitted by user
 XYZ to unknown queue: thequeue --- WHY UNKNOWN QUEUE ???
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: thequeue   --- WHY UNKNOWN QUEUE ???
 start time: 1421349507652
 final status: FAILED
 tracking URL: N/A
 user: XYZ
 Exception in thread main org.apache.spark.SparkException: Yarn application
 has already ended! It might have been killed or unable to launch application
 master.
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102)
 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL JSON array operations

2015-01-15 Thread jvuillermet
yeah that's where I ended up. Thanks ! I'll give it a try.

On Thu, Jan 15, 2015 at 8:46 PM, Ayoub [via Apache Spark User List] 
ml-node+s1001560n21172...@n3.nabble.com wrote:

 You could try to use hive context which bring HiveQL, it would allow you
 to query nested structures using LATERAL VIEW explode...

 see doc
 https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView 
 here


 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21172.html
  To unsubscribe from SQL JSON array operations, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=21164code=amVyZW15LnZ1aWxsZXJtZXRAZ21haWwuY29tfDIxMTY0fC0yMzc2NjY3MDI=
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21173.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Executor parameter doesn't work for Spark-shell on EMR Yarn

2015-01-15 Thread Shuai Zheng
Hi All,

 

I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has
32 vCore and 244G memory.

 

But the command line I use to start up spark-shell, it can't work. For
example:

 

~/spark/bin/spark-shell --jars
/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6
--executor-memory 10G

 

Neither num-executors nor memory setup works.

 

And more interesting, if I use test code:

val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75))

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

 

It will start 32 executors (then I assume it try to start all executors for
every vCore).

 

But if I use some real data to do it (the file size is 200M):

val lines = sc.textFile(s3://.../part-r-0) 

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

It will only start 4 executors, which map to the number of HDFS split (200M
will have 4 splits).

 

So I have two questions:

1, Why the setup parameter is ignored by Yarn? How can I limit the number of
executors I can run? 

2, Why my much smaller test data set will trigger 32 executors but my real
200M data set will only have 4 executors?

 

So how should I control the executor setup on the spark-shell? And I print
the sparkConf, it looks like much less than I expect, and I don't see my
pass in parameter show there.

 

scala sc.getConf.getAll.foreach(println)

(spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0)

(spark.app.id,local-1421353031552)

(spark.eventLog.enabled,true)

(spark.executor.id,driver)

(spark.repl.class.uri,http://10.181.82.38:58415)

(spark.driver.host,ip-10-181-82-38.ec2.internal)

(spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70)

(spark.app.name,Spark shell)

(spark.fileserver.uri,http://10.181.82.38:54666)

(spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk
-1.9.14.jar)

(spark.eventLog.dir,hdfs:///spark-logs)

(spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado
op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado
op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

(spark.master,local[*])

(spark.driver.port,54191)

(spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop
/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop
/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

 

I search the old threads, attached email answer the question about why vCore
setup doesn't work. But I think this is not same issue as me. Otherwise then
default Yarn Spark setup can't do any adjustment? 

 

Regards,

 

Shuai

 

 

 

 

---BeginMessage---
If you are using capacity scheduler in yarn: By default yarn capacity
scheduler uses DefaultResourceCalculator. DefaultResourceCalculator
consider¹s only memory while allocating contains.
You can use DominantResourceCalculator, it considers memory and cpu.
In capacity-scheduler.xml set
yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.res
ource.DefaultResourceCalculator


On 04/11/14 3:03 am, Gen gen.tan...@gmail.com wrote:

Hi,

Well, I doesn't find original documentation, but according to
http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage
http://qnalist.com/questions/2791828/about-the-cpu-cores-and-cpu-usage
,
the vcores is not for physics cpu core but for virtual cores.
And I used top command to monitor the cpu utilization during the spark
task.
The spark can use all cpu even I leave --executor-cores as default(1).

Hope that it can be a help.
Cheers
Gen


Gen wrote
 Hi,
 
 Maybe it is a stupid question, but I am running spark on yarn. I request
 the resources by the following command:
 {code}
 ./spark-submit --master yarn-client --num-executors #number of worker
 --executor-cores #number of cores. ...
 {code}
 However, after launching the task, I use
/
 yarn node -status ID
/
  to monitor the situation of cluster. It shows that the number of Vcores
 used for each container is always 1 no matter what number I pass by
 --executor-cores.
 Any ideas how to solve this problem? Thanks a lot in advance for your
 help.
 
 Cheers
 Gen





--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/executor-cores-cannot-
change-vcores-in-yarn-tp17883p17992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


---End Message---


Re: SQL JSON array operations

2015-01-15 Thread Ayoub
You could try to use hive context which bring HiveQL, it would allow you to
query nested structures using LATERAL VIEW explode... 

see  doc
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LateralView  
here 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164p21172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Executor vs Mapper in Hadoop

2015-01-15 Thread Shuai Zheng
Hi All,

 

I try to clarify some behavior in the spark for executor. Because I am from
Hadoop background, so I try to compare it to the Mapper (or reducer) in
hadoop.

 

1, Each node can have multiple executors, each run in its own process? This
is same as mapper process. 

 

2, I thought the spark executor will use multi-thread mode when there are
more than 1 core to allocate to it (for example: set executor-cores to 5).
In this way, how many partition it can process? For example, if input are 20
partitions (similar as 20 split as mapper input) and we have 5 executors,
each has 4 cores. Will all these partitions will be proceed as the same time
(so each core process one partition) or actually one executor can only run
one partition at the same time?

 

I don't know whether my understand is correct, please suggest.

 

BTW: In general practice, should we always try to set the executor-cores to
a higher number? So we will favor 10 cores * 2 executor than 2 cores*10
executors? Any suggestion here? 

 

Thanks!

 

Regards,

 

Shuai



How to force parallel processing of RDD using multiple thread

2015-01-15 Thread Wang, Ningjun (LNG-NPV)
I have a standalone spark cluster with only one node with 4 CPU cores. How can 
I force spark to do parallel processing of my RDD using multiple threads? For 
example I can do the following

Spark-submit  --master local[4]

However I really want to use the cluster as follow

Spark-submit  --master spark://10.125.21.15:7070

In that case, how can I make sure the RDD is processed with multiple 
threads/cores?

Thanks
Ningjun



Re: Testing if an RDD is empty?

2015-01-15 Thread freedafeng
I think Sampo's thought is to get a function that only tests if a RDD is
empty. He does not want to know the size of the RDD, and getting the size of
a RDD is expensive for large data sets. 

I myself saw many times that my app threw out exceptions because an empty
RDD cannot be saved. This is not big issue, but annoying. Having a cheap
solution testing if an RDD is empty would be nice if there is no such thing
available now. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Testing-if-an-RDD-is-empty-tp1678p21175.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different akka versions and spark

2015-01-15 Thread Koert Kuipers
and to make things even more interesting:

The CDH *5.3* version of Spark 1.2 differs from the Apache Spark 1.2
release in using Akka version 2.2.3, the version used by Spark 1.1 and CDH
5.2. Apache Spark 1.2 uses Akka version 2.3.4.

so i just compiled a program that uses akka against apache spark 1.2.0, and
it indeed does not run on CDH5.3.0. i get class incompatibility errors.


On Tue, Jan 6, 2015 at 10:29 AM, Koert Kuipers ko...@tresata.com wrote:

 if the classes are in the original location than i think its safe to say
 that this makes it impossible for us to build one app that can run against
 spark 1.0.x, 1.1.x and spark 1.2.x.

 thats no big deal, but it does beg the question of what compatibility can
 reasonably be expected for spark 1.x series. i have seen a lot of focus on
 backwards compatibility of the spark 1.x api, but to me thats kind of a
 moot point if i cannot run apps against all 1.x versions anyhow since
 dependencies are not compatible.


 On Mon, Jan 5, 2015 at 5:08 PM, Marcelo Vanzin van...@cloudera.com
 wrote:

 Spark doesn't really shade akka; it pulls a different build (kept
 under the org.spark-project.akka group and, I assume, with some
 build-time differences from upstream akka?), but all classes are still
 in the original location.

 The upgrade is a little more unfortunate than just changing akka,
 since it also changes some transitive dependencies which also have
 compatibility issues (e.g. the typesafe config library). But I believe
 it's needed to support Scala 2.11...

 On Mon, Jan 5, 2015 at 8:27 AM, Koert Kuipers ko...@tresata.com wrote:
  since spark shaded akka i wonder if it would work, but i doubt it
 
  On Mon, Jan 5, 2015 at 9:56 AM, Cody Koeninger c...@koeninger.org
 wrote:
 
  I haven't tried it with spark specifically, but I've definitely run
 into
  problems trying to depend on multiple versions of akka in one project.
 
  On Sat, Jan 3, 2015 at 11:22 AM, Koert Kuipers ko...@tresata.com
 wrote:
 
  hey Ted,
  i am aware of the upgrade efforts for akka. however if spark 1.2
 forces
  me to upgrade all our usage of akka to 2.3.x while spark 1.0 and 1.1
 force
  me to use akka 2.2.x then we cannot build one application that runs
 on all
  spark 1.x versions, which i would consider a major incompatibility.
  best, koert
 
 
  On Sat, Jan 3, 2015 at 12:11 AM, Ted Yu yuzhih...@gmail.com wrote:
 
  Please see http://akka.io/news/2014/05/22/akka-2.3.3-released.html
 which
  points to
 
 http://doc.akka.io/docs/akka/2.3.3/project/migration-guide-2.2.x-2.3.x.html?_ga=1.35212129.1385865413.1420220234
 
  Cheers
 
  On Fri, Jan 2, 2015 at 9:11 AM, Koert Kuipers ko...@tresata.com
 wrote:
 
  i noticed spark 1.2.0 bumps the akka version. since spark uses it's
 own
  akka version, does this mean it can co-exist with another akka
 version in
  the same JVM? has anyone tried this?
 
  we have some spark apps that also use akka (2.2.3) and spray. if
  different akka versions causes conflicts then spark 1.2.0 would not
 be
  backwards compatible for us...
 
  thanks. koert
 
 
 
 
 



 --
 Marcelo





Re: save spark streaming output to single file on hdfs

2015-01-15 Thread jamborta
thanks for the replies. very useful.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-tp21124p21176.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark streaming python files not packaged in assembly jar

2015-01-15 Thread jamborta
Hi all, 

just discovered that the streaming folder in pyspark is not included in the
assembly jar (spark-assembly-1.2.0-hadoop2.3.0.jar), but included in the
python folder. Any reason why?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-python-files-not-packaged-in-assembly-jar-tp21177.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Executor parameter doesn't work for Spark-shell on EMR Yarn

2015-01-15 Thread Shuai Zheng
I figure out the second question, because if I don't pass in the num of
partition for the test data, it will by default assume has max executors
(although I don't know what is this default max num).

 

val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75),2)

will only trigger 2 executors.

 

So I think the default executors num will be decided by the first RDD
operation need to send to executors. This give me a weird way to control the
num of executors (a fake/test code piece run to kick off the executors
first, then run the real behavior - because executor will run the whole
lifecycle of the applications? Although this may not have any real value in
practice J

 

But I still need help for my first question. 

 

Thanks a lot.

 

Regards,

 

Shuai

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Thursday, January 15, 2015 4:03 PM
To: user@spark.apache.org
Subject: RE: Executor parameter doesn't work for Spark-shell on EMR Yarn

 

Forget to mention, I use EMR AMI 3.3.1, Spark 1.2.0. Yarn 2.4. The spark is
setup by the standard script:
s3://support.elasticmapreduce/spark/install-spark

 

 

From: Shuai Zheng [mailto:szheng.c...@gmail.com] 
Sent: Thursday, January 15, 2015 3:52 PM
To: user@spark.apache.org
Subject: Executor parameter doesn't work for Spark-shell on EMR Yarn

 

Hi All,

 

I am testing Spark on EMR cluster. Env is a one node cluster r3.8xlarge. Has
32 vCore and 244G memory.

 

But the command line I use to start up spark-shell, it can't work. For
example:

 

~/spark/bin/spark-shell --jars
/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/*.jar --num-executors 6
--executor-memory 10G

 

Neither num-executors nor memory setup works.

 

And more interesting, if I use test code:

val lines = sc.parallelize(List(-240990|161327,9051480,0,2,30.48,75,
-240990|161324,9051480,0,2,30.48,75))

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

 

It will start 32 executors (then I assume it try to start all executors for
every vCore).

 

But if I use some real data to do it (the file size is 200M):

val lines = sc.textFile(s3://.../part-r-0) 

var count = lines.mapPartitions(dynamoDBBatchWriteFunc).collect.sum

It will only start 4 executors, which map to the number of HDFS split (200M
will have 4 splits).

 

So I have two questions:

1, Why the setup parameter is ignored by Yarn? How can I limit the number of
executors I can run? 

2, Why my much smaller test data set will trigger 32 executors but my real
200M data set will only have 4 executors?

 

So how should I control the executor setup on the spark-shell? And I print
the sparkConf, it looks like much less than I expect, and I don't see my
pass in parameter show there.

 

scala sc.getConf.getAll.foreach(println)

(spark.tachyonStore.folderName,spark-af0c4d42-fe4d-40b0-a3cf-25b6a9e16fa0)

(spark.app.id,local-1421353031552)

(spark.eventLog.enabled,true)

(spark.executor.id,driver)

(spark.repl.class.uri,http://10.181.82.38:58415)

(spark.driver.host,ip-10-181-82-38.ec2.internal)

(spark.executor.extraJavaOptions,-verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70)

(spark.app.name,Spark shell)

(spark.fileserver.uri,http://10.181.82.38:54666)

(spark.jars,file:/home/hadoop/vrisc-lib/aws-java-sdk-1.9.14/lib/aws-java-sdk
-1.9.14.jar)

(spark.eventLog.dir,hdfs:///spark-logs)

(spark.executor.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hado
op/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hado
op/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

(spark.master,local[*])

(spark.driver.port,54191)

(spark.driver.extraClassPath,/home/hadoop/spark/classpath/emr/*:/home/hadoop
/spark/classpath/emrfs/*:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop
/.versions/2.4.0/share/hadoop/common/lib/hadoop-lzo.jar)

 

I search the old threads, attached email answer the question about why vCore
setup doesn't work. But I think this is not same issue as me. Otherwise then
default Yarn Spark setup can't do any adjustment? 

 

Regards,

 

Shuai

 

 

 

 



RE: using hiveContext to select a nested Map-data-type from an AVROmodel+parquet file

2015-01-15 Thread Cheng, Hao
Hi, BB
   Ideally you can do the query like: select key, value.percent from 
mytable_data lateral view explode(audiences) f as key, value limit 3;
   But there is a bug in HiveContext: 
https://issues.apache.org/jira/browse/SPARK-5237
   I am working on it now, hopefully make a patch soon.

Cheng Hao

-Original Message-
From: BB [mailto:bagme...@gmail.com] 
Sent: Friday, January 16, 2015 12:52 AM
To: user@spark.apache.org
Subject: using hiveContext to select a nested Map-data-type from an 
AVROmodel+parquet file

Hi all,
  Any help on the following is very much appreciated.
=
Problem:
  On a schemaRDD read from a parquet file (data within file uses AVRO model) 
using the HiveContext:
 I can't figure out how to 'select' or use 'where' clause, to filter rows 
on a field that has a Map AVRO-data-type. I want to do a filtering using a 
given ('key' : 'value'). How could I do this?

Details:
* the printSchema of the loaded schemaRDD is like so:

-- output snippet -
|-- created: long (nullable = false)
|-- audiences: map (nullable = true)
||-- key: string
||-- value: struct (valueContainsNull = false)
|||-- percent: float (nullable = false)
|||-- cluster: integer (nullable = false)
- 

* I dont get a result when I try to select on a specific value of the 
'audience' like so:
 
  SELECT created, audiences FROM mytable_data LATERAL VIEW
explode(audiences) adtab AS adcol WHERE audiences['key']=='tg_loh' LIMIT 10

 sequence of commands on the spark-shell (a different query and output) is:

-- code snippet -
scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala val parquetFile2 =
hiveContext.parquetFile(/home/myuser/myparquetfile)
scala parquetFile2.registerTempTable(mytable_data)
scala hiveContext.cacheTable(mytable_data)

scala hiveContext.sql(SELECT  audiences['key'], audiences['value'] 
scala FROM
mytable_data LATERAL VIEW explode(audiences) adu AS audien LIMIT
3).collect().foreach(println)

-- output -
[null,null]
[null,null]
[null,null]


gives a list of nulls. I can see that there is data when I just do the 
following (output is truncated):

-- code snippet -
scala hiveContext.sql(SELECT audiences FROM mytable_data LATERAL VIEW
explode(audiences) tablealias AS colalias LIMIT
1).collect().foreach(println)

 output --
[Map(tg_loh - [0.0,1,Map()], tg_co - [0.0,1,Map(tg_co_petrol - 0.0)], 
tg_wall - [0.0,1,Map(tg_wall_poi - 0.0)],  ...


Q1) What am I doing wrong?
Q2) How can I use 'where' in the query to filter on specific values?

What works:
   Queries with filtering, and selecting on fields that have simple AVRO 
data-types, such as long or string works fine.

===

 I hope the explanation makes sense. Thanks.
Best,
BB



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-hiveContext-to-select-a-nested-Map-data-type-from-an-AVROmodel-parquet-file-tp21168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: saveAsTextFile

2015-01-15 Thread ankits
I have seen this happen when the RDD contains null values. Essentially,
saveAsTextFile calls toString() on the elements of the RDD, so a call to
null.toString will result in an NPE.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p21178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Anyway to make RDD preserve input directories structures?

2015-01-15 Thread 逸君曹
say there's some logs:

s3://log-collections/sys1/20141212/nginx.gz
s3://log-collections/sys1/20141213/nginx-part-1.gz
s3://log-collections/sys1/20141213/nginx-part-2.gz

I have a function that parse the logs for later analysis.
I want to parse all the files. So I do this:

logs = sc.textFile('s3://log-collections/sys1/')
logs.map(parse).saveAsTextFile('s3://parsed-logs/')

BUT, this will destroy the date separate naming shema.resulting:

s3://parsed-logs/part-
s3://parsed-logs/part-0001
...

And the worse part is that when I got a new day logs.
It seems rdd.saveAsTextFile couldn't just append the new day's log.

So I create a RDD for every single file.and parse it, save to the name I
want.like this:

one = sc.textFile(s3://log-collections/sys1/20141213/nginx-part-1.gz)
one.map(parse).saveAsTextFile(s3://parsed-logs/20141213/01/)

which resulting:
s3://parsed-logs/20141212/01/part-
s3://parsed-logs/20141213/01/part-
s3://parsed-logs/20141213/01/part-0001
s3://parsed-logs/20141213/02/part-
s3://parsed-logs/20141213/02/part-0001
s3://parsed-logs/20141213/02/part-0002

And when a new day's log comes. I just process that day's logs and put to
the proper directory(or key)

THE PROBLEM is this way I have to create a seperated RDD for every single
file.
which couldn't take advantage of Spark's functionality of automatic
parallel processing.[I'm trying to submit multi applications for each batch
of files.]

Or maybe I'd better use hadoop streaming for this ?
Any suggestions?


Re: Testing if an RDD is empty?

2015-01-15 Thread Tobias Pfeiffer
Hi,

On Fri, Jan 16, 2015 at 7:31 AM, freedafeng freedaf...@yahoo.com wrote:

 I myself saw many times that my app threw out exceptions because an empty
 RDD cannot be saved. This is not big issue, but annoying. Having a cheap
 solution testing if an RDD is empty would be nice if there is no such thing
 available now.


I think the cheapest you can have is computing at least one element in the
RDD, which in the case of, say,

  val maybeEmptyRDD = veryExpensiveRDD.filter(false)

will be just as expensive as .count().

Tobias


Re: Executor vs Mapper in Hadoop

2015-01-15 Thread Sean Owen
An executor is specific to a Spark application, just as a mapper is
specific to a MapReduce job. So a machine will usually be running many
executors, and each is a JVM.

A Mapper is single-threaded; an executor can run many tasks (possibly
from different jobs within the application) at once. Yes, 5 executors
with 4 cores should be able to process 20 tasks in parallel.

In any normal case, you have 1 executor per machine per application.
There are cases where you would make more than 1, but these are
unusual.

On Thu, Jan 15, 2015 at 8:16 PM, Shuai Zheng szheng.c...@gmail.com wrote:
 Hi All,



 I try to clarify some behavior in the spark for executor. Because I am from
 Hadoop background, so I try to compare it to the Mapper (or reducer) in
 hadoop.



 1, Each node can have multiple executors, each run in its own process? This
 is same as mapper process.



 2, I thought the spark executor will use multi-thread mode when there are
 more than 1 core to allocate to it (for example: set executor-cores to 5).
 In this way, how many partition it can process? For example, if input are 20
 partitions (similar as 20 split as mapper input) and we have 5 executors,
 each has 4 cores. Will all these partitions will be proceed as the same time
 (so each core process one partition) or actually one executor can only run
 one partition at the same time?



 I don’t know whether my understand is correct, please suggest.



 BTW: In general practice, should we always try to set the executor-cores to
 a higher number? So we will favor 10 cores * 2 executor than 2 cores*10
 executors? Any suggestion here?



 Thanks!



 Regards,



 Shuai

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Testing if an RDD is empty?

2015-01-15 Thread Sean Owen
How about checking whether take(1).length == 0? If I read the code
correctly, this will only examine the first partition, at least.

On Fri, Jan 16, 2015 at 4:12 AM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 On Fri, Jan 16, 2015 at 7:31 AM, freedafeng freedaf...@yahoo.com wrote:

 I myself saw many times that my app threw out exceptions because an empty
 RDD cannot be saved. This is not big issue, but annoying. Having a cheap
 solution testing if an RDD is empty would be nice if there is no such
 thing
 available now.


 I think the cheapest you can have is computing at least one element in the
 RDD, which in the case of, say,

   val maybeEmptyRDD = veryExpensiveRDD.filter(false)

 will be just as expensive as .count().

 Tobias


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accumulators

2015-01-15 Thread Imran Rashid
You're understanding is basically correct.  Each task creates it's own
local accumulator, and just those results get merged together.

However, there are some performance limitations to be aware of.  First you
need enough memory on the executors to build up whatever those intermediate
results are.  Second, all the work of *merging* the results from each task
are done by the *driver*.  So if there is a lot of stuff to merge, that can
be slow, as its not distributed at all.

Hope that helps a little

Imran
On Jan 14, 2015 6:21 PM, Corey Nolet cjno...@gmail.com wrote:

 Just noticed an error in my wording.

 Should be  I'm assuming it's not immediately aggregating on the driver
 each time I call the += on the Accumulator.

 On Wed, Jan 14, 2015 at 9:19 PM, Corey Nolet cjno...@gmail.com wrote:

 What are the limitations of using Accumulators to get a union of a bunch
 of small sets?

 Let's say I have an RDD[Map{String,Any} and i want to do:

 rdd.map(accumulator += Set(_.get(entityType).get))


 What implication does this have on performance? I'm assuming it's not
 immediately aggregating each time I call the += on the Accumulator. Is it
 doing a local combine and then occasionally sending the results on the
 current partition back to the driver?







Re: Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Thanks Nicos.GC does not contribute much to the execution time of the task. I 
will debug it further today.
Regards,Ajay
 

 On Thursday, January 15, 2015 11:55 PM, Nicos n...@hotmail.com wrote:
   

 Ajay, Unless we are dealing with some synchronization/conditional variable bug 
in Spark, try this per tuning guide:Cache Size TuningOne important 
configuration parameter for GC is the amount of memory that should be used for 
caching RDDs. By default, Spark uses 60% of the configured executor memory 
(spark.executor.memory) to cache RDDs. This means that 40% of memory is 
available for any objects created during task execution.In case your tasks slow 
down and you find that your JVM is garbage-collecting frequently or running out 
of memory, lowering this value will help reduce the memory consumption. To 
change this to, say, 50%, you can call conf.set(spark.storage.memoryFraction, 
0.5) on your SparkConf. Combined with the use of serialized caching, using a 
smaller cache should be sufficient to mitigate most of the garbage collection 
problems. In case you are interested in further tuning the Java GC, continue 
reading below.
Complete list of tips 
here:https://spark.apache.org/docs/latest/tuning.html#serialized-rdd-storage
Cheers,- Nicos

On Jan 15, 2015, at 6:49 AM, Ajay Srivastava a_k_srivast...@yahoo.com.INVALID 
wrote:
Thanks RK. I can turn on speculative execution but I am trying to find out 
actual reason for delay as it happens on any node. Any idea about the stack 
trace in my previous mail.
Regards,Ajay
 

 On Thursday, January 15, 2015 8:02 PM, RK prk...@yahoo.com.INVALID wrote:
   

 If you don't want a few slow tasks to slow down the entire job, you can turn 
on speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at 

OutOfMemory error in Spark Core

2015-01-15 Thread Anand Mohan
We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray.
We are using Kryo serializer for the Avro objects read from Parquet and we
are using our custom Kryo registrator (along the lines of  ADAM
https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51
 
, we just added batched writes and flushes to Kryo's Output for each 512 MB
in the stream, as below 
outstream.array.sliding(512MB).foreach(buf = {
  kryoOut.write(buf)
  kryoOut.flush()
})
)

Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after 
1. loading bulk data from Parquet
2. union-ing it with incremental data in Avro
3. doing timestamp based duplicate removal (including partitioning in
reduceByKey) and 
4. joining a couple of MySQL tables using JdbcRdd

Of late, we are seeing major instabilities where the app crashes on a lost
executor which itself failed due to a OutOfMemory error as below. This looks
almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even
though we are seeing this error in Spark 1.1

2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR
org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught
exception in thread Thread[handle-message-executor-13,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
at com.esotericsoftware.kryo.io.Output.write(Output.java:183)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30)
at
com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
at
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047)
at
org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056)
at
org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421)
at
org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387)
at
org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
at
org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)


The driver log is as below

15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor:
eventProcesserActor failed; shutting down SparkContext
java.util.NoSuchElementException: key not found: 2539
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
at

Spark SQL Custom Predicate Pushdown

2015-01-15 Thread Corey Nolet
I have document storage services in Accumulo that I'd like to expose to
Spark SQL. I am able to push down predicate logic to Accumulo to have it
perform only the seeks necessary on each tablet server to grab the results
being asked for.

I'm interested in using Spark SQL to push those predicates down to the
tablet servers. Where wouldI begin my implementation? Currently I have an
input format which accepts a query object that gets pushed down. How
would I extract this information from the HiveContext/SQLContext to be able
to push this down?


Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread mykidong
Hi Dibyendu,

I am using kafka 0.8.1.1 and spark 1.2.0.
After modifying these version of your pom, I have rebuilt your codes.
But I have not got any messages from ssc.receiverStream(new
KafkaReceiver(_props, i)).

I have found, in your codes, all the messages are retrieved correctly, but
_receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
class's method does not seem to work correctly.

Have you tried running your spark streaming kafka consumer with kafka
0.8.1.1 and spark 1.2.0 ?

- Kidong.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

No , I have not tried yet with Spark 1.2 yet. I will try this out and let
you know how this goes.

By the way, is there any change in Receiver Store method happened in Spark
1.2 ?



Regards,
Dibyendu



On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: OutOfMemory error in Spark Core

2015-01-15 Thread Akhil Das
Did you try increasing the parallelism?

Thanks
Best Regards

On Fri, Jan 16, 2015 at 10:41 AM, Anand Mohan chinn...@gmail.com wrote:

 We have our Analytics App built on Spark 1.1 Core, Parquet, Avro and Spray.
 We are using Kryo serializer for the Avro objects read from Parquet and we
 are using our custom Kryo registrator (along the lines of  ADAM
 
 https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala#L51
 
 , we just added batched writes and flushes to Kryo's Output for each 512 MB
 in the stream, as below
 outstream.array.sliding(512MB).foreach(buf = {
   kryoOut.write(buf)
   kryoOut.flush()
 })
 )

 Our queries are done to a cached RDD(MEMORY_ONLY), that is obtained after
 1. loading bulk data from Parquet
 2. union-ing it with incremental data in Avro
 3. doing timestamp based duplicate removal (including partitioning in
 reduceByKey) and
 4. joining a couple of MySQL tables using JdbcRdd

 Of late, we are seeing major instabilities where the app crashes on a lost
 executor which itself failed due to a OutOfMemory error as below. This
 looks
 almost identical to https://issues.apache.org/jira/browse/SPARK-4885 even
 though we are seeing this error in Spark 1.1

 2015-01-15 20:12:51,653 [handle-message-executor-13] ERROR
 org.apache.spark.executor.ExecutorUncaughtExceptionHandler - Uncaught
 exception in thread Thread[handle-message-executor-13,5,main]
 java.lang.OutOfMemoryError: Requested array size exceeds VM limit
 at java.util.Arrays.copyOf(Arrays.java:2271)
 at
 java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
 at
 java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
 at
 java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
 at
 java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
 at
 java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
 at com.esotericsoftware.kryo.io.Output.flush(Output.java:155)
 at com.esotericsoftware.kryo.io.Output.require(Output.java:135)
 at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:220)
 at com.esotericsoftware.kryo.io.Output.write(Output.java:183)
 at

 com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:31)
 at

 com.philips.hc.eici.analytics.streamingservice.AvroSerializer$$anonfun$write$1.apply(AnalyticsKryoRegistrator.scala:30)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at

 com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:30)
 at

 com.philips.hc.eici.analytics.streamingservice.AvroSerializer.write(AnalyticsKryoRegistrator.scala:18)
 at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
 at

 com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
 at
 com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
 at

 org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119)
 at

 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
 at

 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1047)
 at

 org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1056)
 at
 org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:154)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:421)
 at
 org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:387)
 at

 org.apache.spark.storage.BlockManagerWorker.getBlock(BlockManagerWorker.scala:100)
 at

 org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:79)
 at

 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
 at

 org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
 at

 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)


 The driver log is as below

 15/01/15 12:12:53 ERROR scheduler.DAGSchedulerActorSupervisor:
 eventProcesserActor failed; shutting down SparkContext
 java.util.NoSuchElementException: key not found: 2539
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:799)
 

Re: How to query Spark master for cluster status ?

2015-01-15 Thread Akhil Das
There's a JSON end point in the Web UI ( that running on port 8080),
http://masterip:8080/json/

Thanks
Best Regards

On Thu, Jan 15, 2015 at 6:30 PM, Shing Hing Man mat...@yahoo.com.invalid
wrote:

 Hi,

   I am using Spark 1.2.  The Spark master UI has a status.
 Is there a web service on the Spark master that returns the status of the
 cluster in Json ?

 Alternatively, what is the best way to determine if  a cluster is up.

 Thanks in advance for your assistance!

 Shing



Re: PySpark saveAsTextFile gzip

2015-01-15 Thread Akhil Das
You can use the saveAsNewAPIHadoop
http://spark.apache.org/docs/1.1.0/api/python/pyspark.rdd.RDD-class.html#saveAsNewAPIHadoopFile
file. You can use it for compressing your output, here's a sample code
https://github.com/ScrapCodes/spark-1/blob/master/python/pyspark/tests.py#L1225
to use the API.

Thanks
Best Regards

On Thu, Jan 15, 2015 at 5:16 PM, Tom Seddon mr.tom.sed...@gmail.com wrote:

 Hi,

 I've searched but can't seem to find a PySpark example.  How do I write
 compressed text file output to S3 using PySpark saveAsTextFile?

 Thanks,

 Tom



Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Dibyendu Bhattacharya
Hi Kidong,

Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
any issue with Receiver.Store method . It is able to fetch messages form
Kafka.

Can you cross check other configurations in your setup like Kafka broker IP
, topic name, zk host details, consumer id etc.

Dib

On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in Spark
 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly, but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





RE: Spark SQL Custom Predicate Pushdown

2015-01-15 Thread Cheng, Hao
The Data Source API probably work for this purpose.
It support the column pruning and the Predicate Push Down:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Examples also can be found in the unit test:
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources


From: Corey Nolet [mailto:cjno...@gmail.com]
Sent: Friday, January 16, 2015 1:51 PM
To: user
Subject: Spark SQL Custom Predicate Pushdown

I have document storage services in Accumulo that I'd like to expose to Spark 
SQL. I am able to push down predicate logic to Accumulo to have it perform only 
the seeks necessary on each tablet server to grab the results being asked for.

I'm interested in using Spark SQL to push those predicates down to the tablet 
servers. Where wouldI begin my implementation? Currently I have an input format 
which accepts a query object that gets pushed down. How would I extract this 
information from the HiveContext/SQLContext to be able to push this down?


UserGroupInformation: PriviledgedActionException as

2015-01-15 Thread spraveen
Hi,

When I am trying to run a program in a remote spark machine I am getting
this below exception : 
15/01/16 11:14:39 ERROR UserGroupInformation: PriviledgedActionException
as:user1 (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: Futures
timed out after [30 seconds]
Exception in thread main java.lang.reflect.UndeclaredThrowableException:
Unknown exception in doAs
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.security.PrivilegedActionException:
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
... 4 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)


I am executing this driver class in one machine say(abc) but the spark is
installed on another machine (xyz:7077).
When I execute the same driver class pointing to local it works fine.

public class TestSpark implements Serializable{


public static void main(String[] args) throws IOException {
TestSpark test = new TestSpark();
test.testReduce();
}

public void testReduce() throws IOException {

SparkConf conf = new
SparkConf().setMaster(spark://xyz:7077).setAppName(Sample App);
String[] pathToJar = {/home/user1/Desktop/Jars/TestSpark.jar};

//SparkConf conf = new
SparkConf().setMaster(spark://abc:7077).setAppName(Sample
App).setJars(pathToJar);
//SparkConf conf = new 
SparkConf().setMaster(local).setAppName(Sample
App);

JavaSparkContext jsc = new JavaSparkContext(conf);

ListInteger data = new ArrayListInteger();

for(int i=1;i500;i++){
data.add(i);
}

System.out.println(Size : +data.size());

JavaRDDInteger distData = jsc.parallelize(data);

Integer total = distData.reduce(new Function2Integer, Integer, 
Integer()
{
@Override
public Integer call(Integer v1, Integer v2) throws 
Exception {
String s1 = v1 :  + v1 +  v2 : + v2 +  
-- + this;
return v1 + v2;
}
}); 
System.out.println(--+total );   
}
}

I have also tried setting spark.driver.host :: xyz  spark.driver.port ::
7077 while creating the spark context, but it did not help.

Please advice




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UserGroupInformation-PriviledgedActionException-as-tp21182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2015-01-15 Thread Akhil Das
There was a simple example
https://github.com/dibbhatt/kafka-spark-consumer/blob/master/examples/scala/LowLevelKafkaConsumer.scala#L45
which you can run after changing few lines of configurations.

Thanks
Best Regards

On Fri, Jan 16, 2015 at 12:23 PM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 Just now I tested the Low Level Consumer with Spark 1.2 and I did not see
 any issue with Receiver.Store method . It is able to fetch messages form
 Kafka.

 Can you cross check other configurations in your setup like Kafka broker
 IP , topic name, zk host details, consumer id etc.

 Dib

 On Fri, Jan 16, 2015 at 11:50 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 Hi Kidong,

 No , I have not tried yet with Spark 1.2 yet. I will try this out and let
 you know how this goes.

 By the way, is there any change in Receiver Store method happened in
 Spark 1.2 ?



 Regards,
 Dibyendu



 On Fri, Jan 16, 2015 at 11:25 AM, mykidong mykid...@gmail.com wrote:

 Hi Dibyendu,

 I am using kafka 0.8.1.1 and spark 1.2.0.
 After modifying these version of your pom, I have rebuilt your codes.
 But I have not got any messages from ssc.receiverStream(new
 KafkaReceiver(_props, i)).

 I have found, in your codes, all the messages are retrieved correctly,
 but
 _receiver.store(_dataBuffer.iterator())  which is spark streaming
 abstract
 class's method does not seem to work correctly.

 Have you tried running your spark streaming kafka consumer with kafka
 0.8.1.1 and spark 1.2.0 ?

 - Kidong.






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p21180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






MatchError in JsonRDD.toLong

2015-01-15 Thread Tobias Pfeiffer
Hi,

I am experiencing a weird error that suddenly popped up in my unit tests. I
have a couple of HDFS files in JSON format and my test is basically
creating a JsonRDD and then issuing a very simple SQL query over it. This
used to work fine, but now suddenly I get:

15:58:49.039 [Executor task launch worker-1] ERROR executor.Executor -
Exception in task 1.0 in stage 29.0 (TID 117)
scala.MatchError: 14452800566866169008 (of class java.math.BigInteger)
at org.apache.spark.sql.json.JsonRDD$.toLong(JsonRDD.scala:282)
at org.apache.spark.sql.json.JsonRDD$.enforceCorrectType(JsonRDD.scala:353)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1$$anonfun$apply$12.apply(JsonRDD.scala:381)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:380)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$org$apache$spark$sql$json$JsonRDD$$asRow$1.apply(JsonRDD.scala:365)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.sql.json.JsonRDD$.org$apache$spark$sql$json$JsonRDD$$asRow(JsonRDD.scala:365)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$jsonStringToRow$1.apply(JsonRDD.scala:38)
...

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

The stack trace contains none of my classes, so it's a bit hard to track
down where this starts.

The code of JsonRDD.toLong is in fact

  private def toLong(value: Any): Long = {
value match {
  case value: java.lang.Integer = value.asInstanceOf[Int].toLong
  case value: java.lang.Long = value.asInstanceOf[Long]
}
  }

so if value is a BigInteger, toLong doesn't work. Now I'm wondering where
this comes from (I haven't touched this component in a while, nor upgraded
Spark etc.), but in particular I would like to know how to work around this.

Thanks
Tobias


Re: SparkSQL schemaRDD MapPartitions calls - performance issues - columnar formats?

2015-01-15 Thread Nathan McCarthy
Thanks Cheng!

Is there any API I can get access too (e.g. ParquetTableScan) which would allow 
me to load up the low level/baseRDD of just RDD[Row] so I could avoid the 
defensive copy (maybe lose our on columnar storage etc.).

We have parts of our pipeline using SparkSQL/SchemaRDDs and others using the 
core RDD api (mapPartitions etc.). Any tips?

Out of curiosity, a lot of SparkSQL functions seem to run in a mapPartiton 
(e.g. Distinct). Does a defensive copy happen there too?

Keen to get the best performance and the best blend of SparkSQL and functional 
Spark.

Cheers,
Nathan

From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com
Date: Monday, 12 January 2015 1:21 am
To: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, 
Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance issues - 
columnar formats?


On 1/11/15 1:40 PM, Nathan McCarthy wrote:
Thanks Cheng  Michael! Makes sense. Appreciate the tips!

Idiomatic scala isn't performant. I’ll definitely start using while loops or 
tail recursive methods. I have noticed this in the spark code base.

I might try turning off columnar compression (via 
spark.sql.inMemoryColumnarStorage.compressed=false correct?) and see how 
performance compares to the primitive objects. Would you expect to see similar 
runtimes vs the primitive objects? We do have the luxury of lots of memory at 
the moment so this might give us an additional performance boost.
Turning off compression should be faster, but still slower than directly using 
primitive objects. Because Spark SQL also serializes all objects within a 
column into byte buffers in a compact format. However, this radically reduces 
number of Java objects in the heap and is more GC friendly. When running large 
queries, cost introduced by GC can be significant.

Regarding the defensive copying of row objects. Can we switch this off and just 
be aware of the risks? Is MapPartitions on SchemaRDDs and operating on the Row 
object the most performant way to be flipping between SQL  Scala user code? Is 
there anything else I could be doing?
This can be very dangerous and error prone. Whenever an operator tries to cache 
row objects, turning off defensive copying can introduce wrong query result. 
For example, sort-based shuffle caches rows to do sorting. In some cases, 
sample operator may also cache row objects. This is very implementation 
specific and may change between versions.

Cheers,
~N

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Saturday, 10 January 2015 3:41 am
To: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com
Cc: Nathan 
nathan.mccar...@quantium.com.aumailto:nathan.mccar...@quantium.com.au, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL schemaRDD  MapPartitions calls - performance issues - 
columnar formats?

The other thing to note here is that Spark SQL defensively copies rows when we 
switch into user code.  This probably explains the difference between 1  2.

The difference between 1  3 is likely the cost of decompressing the column 
buffers vs. accessing a bunch of uncompressed primitive objects.

On Fri, Jan 9, 2015 at 6:59 AM, Cheng Lian 
lian.cs@gmail.commailto:lian.cs@gmail.com wrote:
Hey Nathan,

Thanks for sharing, this is a very interesting post :) My comments are inlined 
below.

Cheng

On 1/7/15 11:53 AM, Nathan McCarthy wrote:
Hi,

I’m trying to use a combination of SparkSQL and ‘normal' Spark/Scala via 
rdd.mapPartitions(…). Using the latest release 1.2.0.

Simple example; load up some sample data from parquet on HDFS (about 380m rows, 
10 columns) on a 7 node cluster.

  val t = sqlC.parquetFile(/user/n/sales-tran12m.parquet”)
  t.registerTempTable(test1”)
  sqlC.cacheTable(test1”)

Now lets do some operations on it; I want the total sales  quantities sold for 
each hour in the day so I choose 3 out of the 10 possible columns...

  sqlC.sql(select Hour, sum(ItemQty), sum(Sales) from test1 group by 
Hour).collect().foreach(println)

After the table has been 100% cached in memory, this takes around 11 seconds.

Lets do the same thing but via a MapPartitions call (this isn’t production 
ready code but gets the job done).

  val try2 = sqlC.sql(select Hour, ItemQty, Sales from test1”)
  rddPC.mapPartitions { case hrs =
val qtySum = new Array[Double](24)
val salesSum = new Array[Double](24)

for(r - hrs) {
  val hr = r.getInt(0)
  qtySum(hr) += r.getDouble(1)
  salesSum(hr) += r.getDouble(2)
}
(salesSum zip qtySum).zipWithIndex.map(_.swap).iterator
  }.reduceByKey((a,b) = (a._1 + b._1, a._2 + b._2)).collect().foreach(println)
I believe the evil thing that makes this snippet much slower is the for-loop. 

Re: Serializability: for vs. while loops

2015-01-15 Thread Aaron Davidson
Scala for-loops are implemented as closures using anonymous inner classes
which are instantiated once and invoked many times. This means, though,
that the code inside the loop is actually sitting inside a class, which
confuses Spark's Closure Cleaner, whose job is to remove unused references
from closures to make otherwise-unserializable objects serializable.

My understanding is, in particular, that the closure cleaner will null out
unused fields in the closure, but cannot go past the first level of depth
(i.e., it will not follow field references and null out *their *unused, and
possibly unserializable, references), because this could end up mutating
state outside of the closure itself. Thus, the extra level of depth of the
closure that was introduced by the anonymous class (where presumably the
outer this pointer is considered used by the closure cleaner) is
sufficient to make it unserializable.

While loops, on the other hand, involve none of this trickery, and everyone
is happy.

On Wed, Jan 14, 2015 at 11:37 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 sorry, I don't like questions about serializability myself, but still...

 Can anyone give me a hint why

   for (i - 0 to (maxId - 1)) {  ...  }

 throws a NotSerializableException in the loop body while

   var i = 0
   while (i  maxId) {
 // same code as in the for loop
 i += 1
   }

 works fine? I guess there is something fundamentally different in the way
 Scala realizes for loops?

 Thanks
 Tobias



Visualize Spark Job

2015-01-15 Thread Kuromatsu, Nobuyuki
Hi

I want to visualize tasks and stages in order to analyze spark jobs.
I know necessary metrics is written in spark.eventLog.dir.

Does anyone know the tool like swimlanes in Tez?

Regards,

Nobuyuki Kuromatsu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error connecting to localhost:8060: java.net.ConnectException: Connection refused

2015-01-15 Thread Gautam Bajaj
Hi,

I'm new to Apache Storm.

I'm receiving data at my UDP port 8060, I want to capture it and perform
some operations in the real time, for which I'm using Spark Streaming. While
the code seems to be correct, I get the following output:
https://gist.github.com/d34th4ck3r/0e88896eac864d6d7193

I'm using the following command: mvn  -e -Dmaven.tomcat.port=8080 tomcat:run
exec:java -Dexec.mainClass=twoGrams.Main

Also, $netstat -pn | grep 8060 
return nothing. Hence the port is free. 

I think the problem may be because of the interval within which the Storm is
trying to connect the port is too short, but I'm not sure how to change it
and if that actually is the problem.

Any help would be appreciated.

Thanks,
Gautam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-connecting-to-localhost-8060-java-net-ConnectException-Connection-refused-tp21148.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fast HashSets HashMaps - Spark Collection Utils

2015-01-15 Thread Sean Owen
A recent discussion says these won't be public. However there are many
optimized collection libs in Java. My favorite is Koloboke:
https://github.com/OpenHFT/Koloboke/wiki/Koloboke:-roll-the-collection-implementation-with-features-you-need
Carrot HPPC is good too. The only catch is that the libraries are huge so
you may end up using your build to chop out packages you don't need.
Otherwise its 20+ MB of code.
On Jan 15, 2015 4:05 AM, Night Wolf nightwolf...@gmail.com wrote:

 Hi all,

 I'd like to leverage some of the fast Spark collection implementations in
 my own code.

 Particularity for doing things like distinct counts in a mapPartitions
 loop.

 Are there any plans to make the org.apache.spark.util.collection
 implementations public? Is there any other library out there with similar
 performance?

 Cheers,
 NW



Re: ScalaReflectionException when using saveAsParquetFile in sbt

2015-01-15 Thread Pierre B
Same problem here...
Did u find a solution for this?

P.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ScalaReflectionException-when-using-saveAsParquetFile-in-sbt-tp21020p21150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark fault tolerance mechanism

2015-01-15 Thread YANG Fan
Hi,

I'm quite interested in how Spark's fault tolerance works and I'd like to
ask a question here.

According to the paper, there are two kinds of dependencies--the wide
dependency and the narrow dependency. My understanding is, if the
operations I use are all narrow, then when one machine crashes, the
system just need to recover the lost RDDs from the most recent checkpoint.
However, if all transformations are wide(e.g. in calculating PageRank),
then when one node crashes, all other nodes need to roll back to the most
recent checkpoint. Is my understanding correct?

Thanks!

Best Regards,
Fan


kinesis creating stream scala code exception

2015-01-15 Thread Hafiz Mujadid
Hi, Expert I want to consumes data from kinesis stream using spark streaming.
I am trying to  create kinesis stream using scala code. Here is my code

def main(args: Array[String]) {
println(Stream creation started)
if(create(2))
println(Stream is created successfully)
}
def create(shardCount: Int): Boolean = {
val credentials = new
BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID,
KinesisProperties.AWS_SECRET_KEY)

var kinesisClient: AmazonKinesisClient = new
AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL,
KinesisProperties.KINESIS_SERVICE_NAME,
KinesisProperties.KINESIS_REGION_ID)
val createStreamRequest = new CreateStreamRequest()
createStreamRequest.setStreamName(KinesisProperties.myStreamName);
createStreamRequest.setShardCount(shardCount)
val describeStreamRequest = new DescribeStreamRequest()
describeStreamRequest.setStreamName(KinesisProperties.myStreamName)
try {
Thread.sleep(12)
} catch {
case e: Exception =
}
var streamStatus = not active
while (!streamStatus.equalsIgnoreCase(ACTIVE)) {
try {
Thread.sleep(1000)
} catch {
case e: Exception = e.printStackTrace()
}
try {
val describeStreamResponse =
kinesisClient.describeStream(describeStreamRequest)
streamStatus =
describeStreamResponse.getStreamDescription.getStreamStatus
} catch {
case e: Exception = e.printStackTrace()
}
}
if (streamStatus.equalsIgnoreCase(ACTIVE))
true
else
false
}


When I run this code I get following exception

Exception in thread main java.lang.NoSuchMethodError:
org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/DateTimeFormatter;
at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:379)
at
com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
at
com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105)
at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307)
at
com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280)
at
com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(AmazonKinesisClient.java:2102)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:216)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:139)
at
com.amazonaws.services.kinesis.AmazonKinesisClient.init(AmazonKinesisClient.java:116)
at
com.platalytics.platform.connectors.Kinesis.App$.create(App.scala:32)
at
com.platalytics.platform.connectors.Kinesis.App$.main(App.scala:26)
at com.platalytics.platform.connectors.Kinesis.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)



I have following maven dependency
dependency
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-kinesis-asl_2.10/artifactId
version1.2.0/version
/dependency 


Any suggestion?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/kinesis-creating-stream-scala-code-exception-tp21154.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark saveAsTextFile gzip

2015-01-15 Thread Tom Seddon
Hi,

I've searched but can't seem to find a PySpark example.  How do I write
compressed text file output to S3 using PySpark saveAsTextFile?

Thanks,

Tom


Re: MissingRequirementError with spark 1.2

2015-01-15 Thread sarsol
I am also getting the same error after 1.2 upgrade.

application is crashing on this line

rdd.registerTempTable(temp)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MissingRequirementError with spark

2015-01-15 Thread Pierre B
I found this, which might be useful:

https://github.com/deanwampler/spark-workshop/blob/master/project/Build.scala

I seems that forking is needed.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serializability: for vs. while loops

2015-01-15 Thread Tobias Pfeiffer
Aaron,

thanks for your mail!

On Thu, Jan 15, 2015 at 5:05 PM, Aaron Davidson ilike...@gmail.com wrote:

 Scala for-loops are implemented as closures using anonymous inner classes
 [...]
 While loops, on the other hand, involve none of this trickery, and
 everyone is happy.


Ah, I was suspecting something like that... thank you very much for the
detailed explanation!

Tobias


Re: RowMatrix multiplication

2015-01-15 Thread Toni Verbeiren
You can always define an RDD transpose function yourself. This is what I use in 
PySpark to transpose an RDD of numpy vectors. It’s not optimal and the vectors 
need to fit in memory on the worker nodes.
def rddTranspose(rdd):
# add an index to the rows and the columns, result in triplet
dataT1 = data.zipWithIndex().flatMap(lambda (x,i): [(i,j,e) for (j,e) in 
enumerate(x)])
# use the column from the original as key and group and sort
dataT2 = dataT1.map(lambda (i,j,e): (j, (i,e)))\
   .groupByKey().sortByKey()
# Sort the lists inside the rows
dataT3 = dataT2.map(lambda (i, x): sorted(list(x), cmp=lambda 
(i1,e1),(i2,e2): cmp(i1, i2)))
# Remove the indices inside the rows
dataT4 = dataT3.map(lambda x: map(lambda (i, y): y , x))
# convert to numpy arrays in the rows
return dataT4.map(lambda x: np.asarray(x))

Cheers,
Toni

On 12 Jan 2015 at 20:45:58, Alex Minnaar (aminn...@verticalscope.com) wrote:

That's not quite what I'm looking for.  Let me provide an example.  I have a 
rowmatrix A that is nxm and I have two local matrices b and c.  b is mx1 and c 
is nx1.  In my spark job I wish to perform the following two computations



A*b



and



A^T*c



I don't think this is possible without being able to transpose a rowmatrix.  Am 
I correct?



Thanks,



Alex

From: Reza Zadeh r...@databricks.com
Sent: Monday, January 12, 2015 1:58 PM
To: Alex Minnaar
Cc: u...@spark.incubator.apache.org
Subject: Re: RowMatrix multiplication
 
As you mentioned, you can perform A * b, where A is a rowmatrix and b is a 
local matrix.

From your email, I figure you want to compute b * A^T. To do this, you can 
compute C = A b^T, whose result is the transpose of what you were looking for, 
i.e. C^T = b * A^T. To undo the transpose, you would have transpose C manually 
yourself. Be careful though, because the result might not have each Row fit in 
memory on a single machine, which is what RowMatrix requires. This danger is 
why we didn't provide a transpose operation in RowMatrix natively.

To address this and more, there is an effort to provide more comprehensive 
linear algebra through block matrices, which will likely make it to 1.3:
https://issues.apache.org/jira/browse/SPARK-3434

Best,
Reza

On Mon, Jan 12, 2015 at 6:33 AM, Alex Minnaar aminn...@verticalscope.com 
wrote:
I have a rowMatrix on which I want to perform two multiplications.  The first 
is a right multiplication with a local matrix which is fine.  But after that I 
also wish to right multiply the transpose of my rowMatrix with a different 
local matrix.  I understand that there is no functionality to transpose a 
rowMatrix at this time but I was wondering if anyone could suggest a any kind 
of work-around for this.  I had thought that I might be able to initially 
create two rowMatrices - a normal version and a transposed version - and use 
either when appropriate.  Can anyone think of another alternative?



Thanks,



Alex




Re: *ByKey aggregations: performance + order

2015-01-15 Thread Sean Owen
I'm interested too and don't know for sure but I do not think this case is
optimized this way. However if you know your keys aren't split across
partitions and you have small enough partitions you can implement the same
grouping with mapPartitions and Scala.
On Jan 15, 2015 1:27 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Sean,

 thanks for your message.

 On Wed, Jan 14, 2015 at 8:36 PM, Sean Owen so...@cloudera.com wrote:

 On Wed, Jan 14, 2015 at 4:53 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:
  OK, it seems like even on a local machine (with no network overhead),
 the
  groupByKey version is about 5 times slower than any of the other
  (reduceByKey, combineByKey etc.) functions...

 Even without network overhead, you're still paying the cost of setting
 up the shuffle and serialization.


 Can I pick an appropriate scheduler some time before so that Spark knows
 all items with the same key are on the same host? (Or enforce this?)

 Thanks
 Tobias





Spark-Sql not using cluster slaves

2015-01-15 Thread ahmedabdelrahman
I have been using Spark sql in cluster mode and I am noticing no distribution
and parallelization of the query execution. The performance seems to be very
slow compared to native spark applications and does not offer any speedup
when compared to HIVE. I am using Spark 1.1.0 with a cluster of 5 nodes. The
query that is being ran consists of 4 subqueries where each query creates
groups based on a variable and then they are all unioned(I have attached
part of the query at the end of this post). 

Implementing the query functionality in native spark (scala) seems to run in
a normal distributed manner as expected, but executing the query on spark
sql does not engage the slaves (observed from the master’s web UI). The
performance is the same if I run the query with cluster of a master and 4
slaves or a single node cluster(master only). 

I would greatly appreciate any pointers on the cause of the issue.


Query Sample:

SELECT * 
FROM   ( 
SELECT   age   AS varname, 
 a.tile  AS catname, 
 a.myrandom count(*) AS count 
FROM ( 
SELECT *, 
   cast((5 * rand(65535)) AS int) AS
myrandom, 
   CASE 
  WHEN ( 
( 
   age =
19)) THEN 1 
  WHEN ( 
( 
   age =
35)) THEN 2 
  WHEN ( 
( 
   age =
51)) THEN 3 
  WHEN ( 
( 
   age =
63)) THEN 4 
  ELSE 5 
   END AS tile 
FROM   tablea a) a 
GROUP BY a.tile, 
 a.myrandom 
UNION ALL 
SELECT   salary AS varname, 
 a.tile   AS catname, 
 a.myrandom, 
 count(*) AS count 
FROM ( 
SELECT *, 
   cast((5 * rand(65535)) AS int) AS
myrandom, 
   CASE 
  WHEN ( 
( 
   salary =
39615)) THEN 1 
  WHEN ( 
( 
   salary =
65740)) THEN 2 
  WHEN ( 
( 
   salary =
117555)) THEN 3 
  ELSE 4 
   END AS tile 
FROM   tablea a) a 
GROUP BY a.tile, 
 a.myrandom ) unioned; 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Sql-not-using-cluster-slaves-tp21155.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to query Spark master for cluster status ?

2015-01-15 Thread Shing Hing Man
Hi,
  I am using Spark 1.2.  The Spark master UI has a status.Is there a web 
service on the Spark master that returns the status of the cluster in Json ?
Alternatively, what is the best way to determine if  a cluster is up.

Thanks in advance for your assistance!
Shing


Re: kinesis creating stream scala code exception

2015-01-15 Thread Aniket Bhatnagar
Are you using spark in standalone mode or yarn or mesos? If its yarn,
please mention the hadoop distribution and version. What spark distribution
are  you using (it seems 1.2.0 but compiled with which hadoop version)?

Thanks, Aniket

On Thu, Jan 15, 2015, 4:59 PM Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 Hi, Expert I want to consumes data from kinesis stream using spark
 streaming.
 I am trying to  create kinesis stream using scala code. Here is my code

 def main(args: Array[String]) {
 println(Stream creation started)
 if(create(2))
 println(Stream is created successfully)
 }
 def create(shardCount: Int): Boolean = {
 val credentials = new
 BasicAWSCredentials(KinesisProperties.AWS_ACCESS_KEY_ID,
 KinesisProperties.AWS_SECRET_KEY)

 var kinesisClient: AmazonKinesisClient = new
 AmazonKinesisClient(credentials)
 kinesisClient.setEndpoint(KinesisProperties.KINESIS_ENDPOINT_URL,
 KinesisProperties.KINESIS_SERVICE_NAME,
 KinesisProperties.KINESIS_REGION_ID)
 val createStreamRequest = new CreateStreamRequest()
 createStreamRequest.setStreamName(KinesisProperties.myStreamName);
 createStreamRequest.setShardCount(shardCount)
 val describeStreamRequest = new DescribeStreamRequest()
 describeStreamRequest.setStreamName(KinesisProperties.
 myStreamName)
 try {
 Thread.sleep(12)
 } catch {
 case e: Exception =
 }
 var streamStatus = not active
 while (!streamStatus.equalsIgnoreCase(ACTIVE)) {
 try {
 Thread.sleep(1000)
 } catch {
 case e: Exception = e.printStackTrace()
 }
 try {
 val describeStreamResponse =
 kinesisClient.describeStream(describeStreamRequest)
 streamStatus =
 describeStreamResponse.getStreamDescription.getStreamStatus
 } catch {
 case e: Exception = e.printStackTrace()
 }
 }
 if (streamStatus.equalsIgnoreCase(ACTIVE))
 true
 else
 false
 }


 When I run this code I get following exception

 Exception in thread main java.lang.NoSuchMethodError:
 org.joda.time.format.DateTimeFormatter.withZoneUTC()Lorg/joda/time/format/
 DateTimeFormatter;
 at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(
 NativeConstructorAccessorImpl.java:57)
 at
 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
 DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
 at java.lang.Class.newInstance(Class.java:379)
 at
 com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119)
 at
 com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(
 SignerFactory.java:105)
 at com.amazonaws.auth.SignerFactory.getSigner(
 SignerFactory.java:78)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(
 AmazonWebServiceClient.java:307)
 at
 com.amazonaws.AmazonWebServiceClient.computeSignerByURI(
 AmazonWebServiceClient.java:280)
 at
 com.amazonaws.AmazonWebServiceClient.setEndpoint(
 AmazonWebServiceClient.java:160)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.setEndpoint(
 AmazonKinesisClient.java:2102)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.
 init(AmazonKinesisClient.java:216)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.
 init(AmazonKinesisClient.java:139)
 at
 com.amazonaws.services.kinesis.AmazonKinesisClient.
 init(AmazonKinesisClient.java:116)
 at
 com.platalytics.platform.connectors.Kinesis.App$.create(App.scala:32)
 at
 com.platalytics.platform.connectors.Kinesis.App$.main(App.scala:26)
 at com.platalytics.platform.connectors.Kinesis.App.main(App.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
 57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(
 DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)



 I have following maven dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming-kinesis-asl_2.10/artifactId
 version1.2.0/version
 /dependency


 Any suggestion?



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/kinesis-creating-stream-scala-code-
 exception-tp21154.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 

Using native blas with mllib

2015-01-15 Thread lev
Hi,
I'm trying to use the native blas, and I followed all the threads I saw here
and I still can't get rid of those warning:
WARN netlib.BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS
WARN netlib.BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS

I'm using:
Centos 6.5
java 1.7.0_71
spark 1.1.0
running on yarn cluster

and the things I done so far are:

- install those packeges:
  - lapack-devel
  - blas-devel
  - atlas-devel
  - openblas-devel
  - gcc-gfortran.x86_64
  - libgfortran.x86_64 

  
- added those dependencies to mllib pom.xml:
dependency
  groupIdorg.scalanlp/groupId
  artifactIdbreeze-natives_${scala.binary.version}/artifactId
  version0.9/version
/dependency
dependency
  groupIdcom.github.fommil.netlib/groupId
  artifactIdall/artifactId
  version1.1.2/version
  typepom/type
/dependency

(I know that the -Pnetlib-lgpl flag shloud have take care of the netlib
dependency, but for some reason it didn't work)
  
  
- build spark with the command: mvn -Pyarn -Phive -Phadoop-2.5 -Pnetlib-lgpl
-Dhadoop.version=2.5.0-cdh5.2.0 -DskipTests clean package
(I'm building it on windows if it makes any difference)


And I did the follwing checks:
sudo /sbin/ldconfig -p | grep libblas.so.3
libblas.so.3 (libc6,x86-64) = /usr/lib64/libblas.so.3

sudo /sbin/ldconfig -p | grep liblapack.so.3
liblapack.so.3 (libc6,x86-64) = /usr/lib64/liblapack.so.3
liblapack.so.3 (libc6,x86-64) = /usr/lib64/atlas/liblapack.so.3

jar tf
assembly\target\scala-2.10\spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar
| grep netlib-native_system-linux-x86_64.so
netlib-native_system-linux-x86_64.so
netlib-native_system-linux-x86_64.so.asc

Anything else I can try?

Thanks,
Lev.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-native-blas-with-mllib-tp21156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Spark SQL with multiple (avro) files

2015-01-15 Thread David Jones
I've tried this now. Spark can load multiple avro files from the same
directory by passing a path to a directory. However, passing multiple paths
separated with commas didn't work.


Is there any way to load all avro files in multiple directories using
sqlContext.avroFile?

On Wed, Jan 14, 2015 at 3:53 PM, David Jones letsnumsperi...@gmail.com
wrote:

 Should I be able to pass multiple paths separated by commas? I haven't
 tried but didn't think it'd work. I'd expected a function that accepted a
 list of strings.

 On Wed, Jan 14, 2015 at 3:20 PM, Yana Kadiyska yana.kadiy...@gmail.com
 wrote:

 If the wildcard path you have doesn't work you should probably open a bug
 -- I had a similar problem with Parquet and it was a bug which recently got
 closed. Not sure if sqlContext.avroFile shares a codepath with 
 .parquetFile...you
 can try running with bits that have the fix for .parquetFile or look at the
 source...
 Here was my question for reference:

 http://mail-archives.apache.org/mod_mbox/spark-user/201412.mbox/%3ccaaswr-5rfmu-y-7htluj2eqqaecwjs8jh+irrzhm7g1ex7v...@mail.gmail.com%3E

 On Wed, Jan 14, 2015 at 4:34 AM, David Jones letsnumsperi...@gmail.com
 wrote:

 Hi,

 I have a program that loads a single avro file using spark SQL, queries
 it, transforms it and then outputs the data. The file is loaded with:

 val records = sqlContext.avroFile(filePath)
 val data = records.registerTempTable(data)
 ...


 Now I want to run it over tens of thousands of Avro files (all with
 schemas that contain the fields I'm interested in).

 Is it possible to load multiple avro files recursively from a top-level
 directory using wildcards? All my avro files are stored under
 s3://my-bucket/avros/*/DATE/*.avro, and I want to run my task across all of
 these on EMR.

 If that's not possible, is there some way to load multiple avro files
 into the same table/RDD so the whole dataset can be processed (and in that
 case I'd supply paths to each file concretely, but I *really* don't want to
 have to do that).

 Thanks
 David






Re: MissingRequirementError with spark

2015-01-15 Thread sarsol
added fork :=true in Scala Build.

Commandline sbt is working fine but Eclipse SCALA IDE is still giving same
error.

This was all working fine untill Spark 1.1.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-tp21149p21159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Inserting an element in RDD[String]

2015-01-15 Thread Hafiz Mujadid
hi experts!

I hav an RDD[String] and i want to add schema line at beginning in this rdd.
I know RDD is immutable. So is there anyway to have a new rdd with one
schema line and contents of previous rdd?


Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to define SparkContext with Cassandra connection for spark-jobserver?

2015-01-15 Thread abhishek
In the  spark job server* bin *folder,  you will find* application.conf*
file, put 

 context-settings {

spark.cassandra.connection.host =  ur address
  }

Hope this should work




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-define-SparkContext-with-Cassandra-connection-for-spark-jobserver-tp21119p21162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Some tasks are taking long time

2015-01-15 Thread Ajay Srivastava
Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay



Re: saveAsTextFile

2015-01-15 Thread Prannoy
Hi,

Before saving the rdd do a collect to the rdd and print the content of the
rdd. Probably its a null value.

Thanks.

On Sat, Jan 3, 2015 at 5:37 PM, Pankaj Narang [via Apache Spark User List] 
ml-node+s1001560n20953...@n3.nabble.com wrote:

 If you can paste the code here I can certainly help.

 Also confirm the version of spark you are using

 Regards
 Pankaj
 Infoshore Software
 India

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p20953.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp20951p21160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Failing jobs runs twice

2015-01-15 Thread Anders Arpteg
Found a setting that seems to fix this problem, but it does not seems to be
available until Spark 1.3. See
https://issues.apache.org/jira/browse/SPARK-2165

However, glad to see a work is being done with the issue.

On Tue, Jan 13, 2015 at 8:00 PM, Anders Arpteg arp...@spotify.com wrote:

 Yes Andrew, I am. Tried setting spark.yarn.applicationMaster.waitTries to
 1 (thanks Sean), but with no luck. Any ideas?

 On Tue, Jan 13, 2015 at 7:58 PM, Andrew Or and...@databricks.com wrote:

 Hi Anders, are you using YARN by any chance?

 2015-01-13 0:32 GMT-08:00 Anders Arpteg arp...@spotify.com:

 Since starting using Spark 1.2, I've experienced an annoying issue with
 failing apps that gets executed twice. I'm not talking about tasks inside a
 job, that should be executed multiple times before failing the whole app.
 I'm talking about the whole app, that seems to close the previous Spark
 context, start a new, and rerun the app again.

 This is annoying since it overwrite the log files as well and it becomes
 hard to troubleshoot the failing app. Does anyone know how to turn this
 feature off?

 Thanks,
 Anders






Error when running SparkPi on Secure HA Hadoop cluster

2015-01-15 Thread Manoj Samel
Hi,

Setup is as follows

Hadoop Cluster 2.3.0 (CDH5.0)
- Namenode HA
- Resource manager HA
- Secured with Kerberos

Spark 1.2

Run SparkPi as follows
- conf/spark-defaults.conf has following entries
spark.yarn.queue myqueue
spark.yarn.access.namenodes hdfs://namespace (remember this is namenode HA)
- Do kinit with some user keytab
- submit SparkPi as follows
spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client
--num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores
1 --queue thequeue $MY_SPARK_DIR/lib/spark-examples*.jar 10

Gives following trace (not sure why it shows unknown queue when queue name
is specified in the spark-defaults.conf above.

15/01/15 19:18:27 INFO impl.YarnClientImpl: Submitted application
application_1415648563285_31469
15/01/15 19:18:28 INFO yarn.Client: Application report for
application_1415648563285_31469 (state: FAILED)
15/01/15 19:18:28 INFO yarn.Client:
 client token: N/A
 diagnostics: Application application_1415648563285_31469 submitted by user
XYZ to unknown queue: thequeue --- WHY UNKNOWN QUEUE ???
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: thequeue   --- WHY UNKNOWN QUEUE ???
 start time: 1421349507652
 final status: FAILED
 tracking URL: N/A
 user: XYZ
Exception in thread main org.apache.spark.SparkException: Yarn
application has already ended! It might have been killed or unable to
launch application master.
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:102)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:58)


Re: Inserting an element in RDD[String]

2015-01-15 Thread Aniket Bhatnagar
Sure there is. Create a new RDD just containing the schema line (hint: use
sc.parallelize) and then union both the RDDs (the header RDD and data RDD)
to get a final desired RDD.

On Thu Jan 15 2015 at 19:48:52 Hafiz Mujadid hafizmujadi...@gmail.com
wrote:

 hi experts!

 I hav an RDD[String] and i want to add schema line at beginning in this
 rdd.
 I know RDD is immutable. So is there anyway to have a new rdd with one
 schema line and contents of previous rdd?


 Thanks



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Some tasks are taking long time

2015-01-15 Thread RK
If you don't want a few slow tasks to slow down the entire job, you can turn on 
speculation. 
Here are the speculation settings from Spark Configuration - Spark 1.2.0 
Documentation.
|   |
|   |   |   |   |   |
| Spark Configuration - Spark 1.2.0 DocumentationSpark Configuration Spark 
Properties Dynamically Loading Spark Properties Viewing Spark Properties 
Available Properties Application Properties Runtime Environment Shuffle 
Behavior Spark UI  |
|  |
| View on spark.apache.org | Preview by Yahoo |
|  |
|   |



| spark.speculation | false | If set to true, performs speculative execution 
of tasks. This means if one or more tasks are running slowly in a stage, they 
will be re-launched. |
| spark.speculation.interval | 100 | How often Spark will check for tasks to 
speculate, in milliseconds. |
| spark.speculation.quantile | 0.75 | Percentage of tasks which must be 
complete before speculation is enabled for a particular stage. |
| spark.speculation.multiplier | 1.5 | How many times slower a task is than the 
median to be considered for speculation.
 |

  

 On Thursday, January 15, 2015 5:44 AM, Ajay Srivastava 
a_k_srivast...@yahoo.com.INVALID wrote:
   

 Hi,
My spark job is taking long time. I see that some tasks are taking longer time 
for same amount of data and shuffle read/write. What could be the possible 
reasons for it ?
The thread-dump sometimes show that all the tasks in an executor are waiting 
with following stack trace -
Executor task launch worker-12 daemon prio=10 tid=0x7fcd44276000 
nid=0x3f85 waiting on condition [0x7fcce3ddc000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  0x7fd0aee82e00 (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(Unknown Source)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)
    at java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:253)
    at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
    at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
    at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
    
Any inputs/suggestions to improve job time will be appreciated.
Regards,Ajay



   

Re: Inserting an element in RDD[String]

2015-01-15 Thread Prannoy
Hi,

You can take the schema line in another rdd and than do a union of the two
rdd .

ListString schemaList = new ArrayListString;
schemaList.add(xyz);

// where xyz is your schema line

JavaRDD schemaRDDString = sc.parallize(schemaList) ;

//where sc is your sparkcontext

 JavaRDD newRDDString = schemaRDD.union(yourRDD);

// where yourRDD is your another rdd starting of which you want to add the
schema line.

The code is in java, you can change it to scala 

Thanks.





On Thu, Jan 15, 2015 at 7:46 PM, Hafiz Mujadid [via Apache Spark User List]
ml-node+s1001560n21161...@n3.nabble.com wrote:

 hi experts!

 I hav an RDD[String] and i want to add schema line at beginning in this
 rdd. I know RDD is immutable. So is there anyway to have a new rdd with one
 schema line and contents of previous rdd?


 Thanks

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Inserting-an-element-in-RDD-String-tp21161p21163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: How to force parallel processing of RDD using multiple thread

2015-01-15 Thread Sean Owen
Check the number of partitions in your input. It may be much less than
the available parallelism of your small cluster. For example, input
that lives in just 1 partition will spawn just 1 task.

Beyond that parallelism just happens. You can see the parallelism of
each operation in the Spark UI.

On Thu, Jan 15, 2015 at 10:53 PM, Wang, Ningjun (LNG-NPV)
ningjun.w...@lexisnexis.com wrote:
 Spark Standalone cluster.

 My program is running very slow, I suspect it is not doing parallel 
 processing of rdd. How can I force it to run parallel? Is there anyway to 
 check whether it is processed in parallel?

 Regards,

 Ningjun Wang
 Consulting Software Engineer
 LexisNexis
 121 Chanlon Road
 New Providence, NJ 07974-1541


 -Original Message-
 From: Sean Owen [mailto:so...@cloudera.com]
 Sent: Thursday, January 15, 2015 4:29 PM
 To: Wang, Ningjun (LNG-NPV)
 Cc: user@spark.apache.org
 Subject: Re: How to force parallel processing of RDD using multiple thread

 What is your cluster manager? For example on YARN you would specify 
 --executor-cores. Read:
 http://spark.apache.org/docs/latest/running-on-yarn.html

 On Thu, Jan 15, 2015 at 8:54 PM, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:
 I have a standalone spark cluster with only one node with 4 CPU cores.
 How can I force spark to do parallel processing of my RDD using
 multiple threads? For example I can do the following



 Spark-submit  --master local[4]



 However I really want to use the cluster as follow



 Spark-submit  --master spark://10.125.21.15:7070



 In that case, how can I make sure the RDD is processed with multiple
 threads/cores?



 Thanks

 Ningjun



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Anyway to make RDD preserve input directories structures?

2015-01-15 Thread Sean Owen
Maybe you are saying you already do this, but it's perfectly possible
to process as many RDDs as you like in parallel on the driver. That
may allow your current approach to eat up as much parallelism as you
like. I'm not sure if that's what you are describing with submit
multi applications but you do not need separate Spark applications,
just something like a local parallel Scala collection invoking the RDD
operations.

Yes, these operations do not and should not 'append' data to existing files.

Of course this has the downside of all that overhead of processing
every file individually rather than as one big job. Although you may
not be able to design this differently, I suggest that ideally you do
not encode this information in directory structure, but in the data
itself. I know sometimes this is important for downstream tools
though, as it is part of how partitioning is defined for example.


On Fri, Jan 16, 2015 at 2:15 AM, 逸君曹 caoyijun2...@gmail.com wrote:
 say there's some logs:

 s3://log-collections/sys1/20141212/nginx.gz
 s3://log-collections/sys1/20141213/nginx-part-1.gz
 s3://log-collections/sys1/20141213/nginx-part-2.gz

 I have a function that parse the logs for later analysis.
 I want to parse all the files. So I do this:

 logs = sc.textFile('s3://log-collections/sys1/')
 logs.map(parse).saveAsTextFile('s3://parsed-logs/')

 BUT, this will destroy the date separate naming shema.resulting:

 s3://parsed-logs/part-
 s3://parsed-logs/part-0001
 ...

 And the worse part is that when I got a new day logs.
 It seems rdd.saveAsTextFile couldn't just append the new day's log.

 So I create a RDD for every single file.and parse it, save to the name I
 want.like this:

 one = sc.textFile(s3://log-collections/sys1/20141213/nginx-part-1.gz)
 one.map(parse).saveAsTextFile(s3://parsed-logs/20141213/01/)

 which resulting:
 s3://parsed-logs/20141212/01/part-
 s3://parsed-logs/20141213/01/part-
 s3://parsed-logs/20141213/01/part-0001
 s3://parsed-logs/20141213/02/part-
 s3://parsed-logs/20141213/02/part-0001
 s3://parsed-logs/20141213/02/part-0002

 And when a new day's log comes. I just process that day's logs and put to
 the proper directory(or key)

 THE PROBLEM is this way I have to create a seperated RDD for every single
 file.
 which couldn't take advantage of Spark's functionality of automatic parallel
 processing.[I'm trying to submit multi applications for each batch of
 files.]

 Or maybe I'd better use hadoop streaming for this ?
 Any suggestions?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Issue with Parquet on Spark 1.2 and Amazon EMR

2015-01-15 Thread Bozeman, Christopher
Thanks to Aniket’s work there is two new options to the EMR install script for 
Spark.   See 
https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/README.md 
The “-a” option can be used to bump the spark-assembly to the front of the 
classpath.

-Christopher


From: Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com]
Sent: Monday, January 12, 2015 3:05 AM
To: Kelly, Jonathan; Adam Gilmore; user@spark.apache.org
Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR

Meanwhile, I have submitted a pull request 
(https://github.com/awslabs/emr-bootstrap-actions/pull/37) that allows users to 
place their jars ahead of all other jars in spark classpath. This should serve 
as a temporary workaround for all class conflicts.

Thanks,
Aniket

On Mon Jan 05 2015 at 22:13:47 Kelly, Jonathan 
jonat...@amazon.commailto:jonat...@amazon.com wrote:
I've noticed the same thing recently and will contact the appropriate owner 
soon.  (I work for Amazon, so I'll go through internal channels and report back 
to this list.)

In the meantime, I've found that editing spark-env.sh and putting the Spark 
assembly first in the classpath fixes the issue.  I expect that the version of 
Parquet that's being included in the EMR libs just needs to be upgraded.

~ Jonathan Kelly

From: Aniket Bhatnagar 
aniket.bhatna...@gmail.commailto:aniket.bhatna...@gmail.com
Date: Sunday, January 4, 2015 at 10:51 PM
To: Adam Gilmore dragoncu...@gmail.commailto:dragoncu...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Issue with Parquet on Spark 1.2 and Amazon EMR


Can you confirm your emr version? Could it be because of the classpath entries 
for emrfs? You might face issues with using S3 without them.

Thanks,
Aniket

On Mon, Jan 5, 2015, 11:16 AM Adam Gilmore 
dragoncu...@gmail.commailto:dragoncu...@gmail.com wrote:
Just an update on this - I found that the script by Amazon was the culprit - 
not exactly sure why.  When I installed Spark manually onto the EMR (and did 
the manual configuration of all the EMR stuff), it worked fine.

On Mon, Dec 22, 2014 at 11:37 AM, Adam Gilmore 
dragoncu...@gmail.commailto:dragoncu...@gmail.com wrote:
Hi all,

I've just launched a new Amazon EMR cluster and used the script at:

s3://support.elasticmapreduce/spark/install-spark

to install Spark (this script was upgraded to support 1.2).

I know there are tools to launch a Spark cluster in EC2, but I want to use EMR.

Everything installs fine; however, when I go to read from a Parquet file, I end 
up with (the main exception):

Caused by: java.lang.NoSuchMethodError: 
parquet.hadoop.ParquetInputSplit.init(Lorg/apache/hadoop/fs/Path;JJJ[Ljava/lang/String;[JLjava/lang/String;Ljava/util/Map;)V
at 
parquet.hadoop.TaskSideMetadataSplitStrategy.generateTaskSideMDSplits(ParquetInputFormat.java:578)
... 55 more

It seems to me like a version mismatch somewhere.  Where is the parquet-hadoop 
jar coming from?  Is it built into a fat jar for Spark?

Any help would be appreciated.  Note that 1.1.1 worked fine with Parquet files.