Re: Spark cluster tuning recommendation

2016-07-11 Thread Anuj Kumar
That configuration looks bad. With only two cores in use and 1GB used by
the app. Few points-

1. Please oversubscribe those CPUs to at-least twice the amount of cores
you have to start-with and then tune if it freezes
2. Allocate all of the CPU cores and memory to your running app (I assume
it is your test environment)
3. Assuming that you are running a quad core machine if you define cores as
8 for your workers you will get 56 cores (CPU threads)
4. Also, it depends on the source from where you are reading the data. If
you are reading from HDFS, what is your block size and part count?
5. You may also have to tune the timeouts and frame-size based on the
dataset and errors that you are facing

We have run terasort with couple of high-end worker machines RW from HDFS
with 5-10 mount points allocated for HDFS and Spark local. We have used
multiple configuration, like-
10W-10CPU-10GB, 25W-6CPU-6GB running on each of the two machines with HDFS
512MB blocks and 1000-2000 parts. All these guys chatting at 10Gbe, worked
well.

On Tue, Jul 12, 2016 at 3:39 AM, Kartik Mathur  wrote:

> I am trying a run terasort in spark , for a 7 node cluster with only 10g
> of data and executors get lost with GC overhead limit exceeded error.
>
> This is what my cluster looks like -
>
>
>- *Alive Workers:* 7
>- *Cores in use:* 28 Total, 2 Used
>- *Memory in use:* 56.0 GB Total, 1024.0 MB Used
>- *Applications:* 1 Running, 6 Completed
>- *Drivers:* 0 Running, 0 Completed
>- *Status:* ALIVE
>
> Each worker has 8 cores and 4GB memory.
>
> My questions is how do people running in production decide these
> properties -
>
> 1) --num-executors
> 2) --executor-cores
> 3) --executor-memory
> 4) num of partitions
> 5) spark.default.parallelism
>
> Thanks,
> Kartik
>
>
>


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Anuj Kumar
Good point Mike +1

On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote:

> When submitting a job with spark-submit, I've observed delays (up to
> 1--2 seconds) for the executors to respond to the driver in order to
> receive tasks in the first stage. The delay does not persist once the
> executors have been synchronized.
>
> When the tasks are very short, as may be your case (relatively small
> data and a simple map task like you have described), the 8 tasks in
> your stage may be allocated to only 1 executor in 2 waves of 4, since
> the second executor won't have responded to the master before the
> first 4 tasks on the first executor have completed.
>
> To see if this is the cause in your particular case, you could try the
> following to confirm:
> 1. Examine the starting times of the tasks alongside their executor
> 2. Make a "dummy" stage execute before your real stages to
> synchronize the executors by creating and materializing any random RDD
> 3. Make the tasks longer, i.e. with some silly computational work.
>
> Mike
>
>
> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote:
> > Yes its the same data.
> >
> > 1) The number of partitions are the same (8, which is an argument to the
> > HashPartitioner). In the first case, these partitions are spread across
> > both the worker nodes. In the second case, all the partitions are on the
> > same node.
> > 2) What resources would be of interest here? Scala shell takes the
> default
> > parameters since we use "bin/spark-shell --master " to run
> the
> > scala-shell. For the scala program, we do set some configuration options
> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> > serializer.
> >
> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> > RAM.1 executor runs on each worker node. Following configuration options
> > are set for the scala program -- perhaps we should move it to the spark
> > config file.
> >
> > Driver memory and executor memory are set to 12GB
> > parallelism is set to 8
> > Kryo serializer is used
> > Number of retainedJobs and retainedStages has been increased to check
> them
> > in the UI.
> >
> > What information regarding Spark Context would be of interest here?
> >
> > Regards,
> > Raghava.
> >
> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote:
> >
> >> If the data file is same then it should have similar distribution of
> >> keys.
> >> Few queries-
> >>
> >> 1. Did you compare the number of partitions in both the cases?
> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
> >> Program being submitted?
> >>
> >> Also, can you please share the details of Spark Context, Environment and
> >> Executors when you run via Scala program?
> >>
> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >> m.vijayaragh...@gmail.com> wrote:
> >>
> >>> Hello All,
> >>>
> >>> We are using HashPartitioner in the following way on a 3 node cluster
> (1
> >>> master and 2 worker nodes).
> >>>
> >>> val u =
> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
> >>> x.toInt) } }).partitionBy(new
> HashPartitioner(8)).setName("u").persist()
> >>>
> >>> u.count()
> >>>
> >>> If we run this from the spark shell, the data (52 MB) is split across
> >>> the
> >>> two worker nodes. But if we put this in a scala program and run it,
> then
> >>> all the data goes to only one node. We have run it multiple times, but
> >>> this
> >>> behavior does not change. This seems strange.
> >>>
> >>> Is there some problem with the way we use HashPartitioner?
> >>>
> >>> Thanks in advance.
> >>>
> >>> Regards,
> >>> Raghava.
> >>>
> >>
> >>
> >
> >
> > --
> > Regards,
> > Raghava
> > http://raghavam.github.io
> >
>
>
> --
> Thanks,
> Mike
>


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Anuj Kumar
Few params like- spark.task.cpus, spark.cores.max will help. Also, for 52MB
of data you need not have 12GB allocated to executors. Better to assign
512MB or so and increase the number of executors per worker node. Try
reducing that executor memory to 512MB or so for this case.

On Mon, Apr 18, 2016 at 9:07 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Yes its the same data.
>
> 1) The number of partitions are the same (8, which is an argument to the
> HashPartitioner). In the first case, these partitions are spread across
> both the worker nodes. In the second case, all the partitions are on the
> same node.
> 2) What resources would be of interest here? Scala shell takes the default
> parameters since we use "bin/spark-shell --master " to run the
> scala-shell. For the scala program, we do set some configuration options
> such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> serializer.
>
> We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> RAM.1 executor runs on each worker node. Following configuration options
> are set for the scala program -- perhaps we should move it to the spark
> config file.
>
> Driver memory and executor memory are set to 12GB
> parallelism is set to 8
> Kryo serializer is used
> Number of retainedJobs and retainedStages has been increased to check them
> in the UI.
>
> What information regarding Spark Context would be of interest here?
>
> Regards,
> Raghava.
>
> On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> wrote:
>
>> If the data file is same then it should have similar distribution of
>> keys. Few queries-
>>
>> 1. Did you compare the number of partitions in both the cases?
>> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> Program being submitted?
>>
>> Also, can you please share the details of Spark Context, Environment and
>> Executors when you run via Scala program?
>>
>> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> m.vijayaragh...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> We are using HashPartitioner in the following way on a 3 node cluster (1
>>> master and 2 worker nodes).
>>>
>>> val u =
>>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
>>> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>>>
>>> u.count()
>>>
>>> If we run this from the spark shell, the data (52 MB) is split across
>>> the two worker nodes. But if we put this in a scala program and run it,
>>> then all the data goes to only one node. We have run it multiple times, but
>>> this behavior does not change. This seems strange.
>>>
>>> Is there some problem with the way we use HashPartitioner?
>>>
>>> Thanks in advance.
>>>
>>> Regards,
>>> Raghava.
>>>
>>
>>
>
>
> --
> Regards,
> Raghava
> http://raghavam.github.io
>


Re: strange HashPartitioner behavior in Spark

2016-04-17 Thread Anuj Kumar
If the data file is same then it should have similar distribution of keys.
Few queries-

1. Did you compare the number of partitions in both the cases?
2. Did you compare the resource allocation for Spark Shell vs Scala Program
being submitted?

Also, can you please share the details of Spark Context, Environment and
Executors when you run via Scala program?

On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
m.vijayaragh...@gmail.com> wrote:

> Hello All,
>
> We are using HashPartitioner in the following way on a 3 node cluster (1
> master and 2 worker nodes).
>
> val u = sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
> x.toInt) } }).partitionBy(new HashPartitioner(8)).setName("u").persist()
>
> u.count()
>
> If we run this from the spark shell, the data (52 MB) is split across the
> two worker nodes. But if we put this in a scala program and run it, then
> all the data goes to only one node. We have run it multiple times, but this
> behavior does not change. This seems strange.
>
> Is there some problem with the way we use HashPartitioner?
>
> Thanks in advance.
>
> Regards,
> Raghava.
>


Re: ERROR: "Size exceeds Integer.MAX_VALUE" Spark 1.5

2015-10-05 Thread Anuj Kumar
You may be hitting the 2GB limit. See-
https://issues.apache.org/jira/browse/SPARK-5928
https://issues.apache.org/jira/browse/SPARK-6190
https://issues.apache.org/jira/browse/SPARK-6235

Increasing the number of partitions might help-
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
but I have not tried this configuration parameter earlier.

OTOH, I didn't understand the motive of the query. What exactly is the
purpose? - Are you looking for distinct guids?

Regards,
Anuj

On Tue, Oct 6, 2015 at 3:42 AM, Muhammad Ahsan 
wrote:

> Hello Everyone !
>
> I am working with spark 1.5 over YARN. I am trying something like
>
> val results = sqlContext.sql("SELECT guid FROM clickstream group by guid")
>
> results.take(10).foreach(println)
>
>
> But I am getting the following error. I am using data frames and unable to
> resolve this error, please help
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 815 in stage 13.0 failed 4 times, most recent failure: Lost task 815.3 in
> stage 13.0 (TID 3612, amd-014.test.com): java.lang.RuntimeException:
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512)
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
> at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> at