Israel Spark Meetup

2016-09-20 Thread Romi Kuntsman
Hello,
Please add a link in Spark Community page (
https://spark.apache.org/community.html)
To Israel Spark Meetup (https://www.meetup.com/israel-spark-users/)
We're an active meetup group, unifying the local Spark user community, and
having regular meetups.
Thanks!
Romi K.


Re: Shuffle FileNotFound Exception

2015-11-18 Thread Romi Kuntsman
take executor memory times spark.shuffle.memoryFraction
and divide the data so that each partition is less than the above

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Wed, Nov 18, 2015 at 2:09 PM, Tom Arnfeld <t...@duedil.com> wrote:

> Hi Romi,
>
> Thanks! Could you give me an indication of how much increase the
> partitions by? We’ll take a stab in the dark, the input data is around 5M
> records (though each record is fairly small). We’ve had trouble both with
> DataFrames and RDDs.
>
> Tom.
>
> On 18 Nov 2015, at 12:04, Romi Kuntsman <r...@totango.com> wrote:
>
> I had many issues with shuffles (but not this one exactly), and what
> eventually solved it was to repartition to input into more parts. Have you
> tried that?
>
> P.S. not sure if related, but there's a memory leak in the shuffle
> mechanism
> https://issues.apache.org/jira/browse/SPARK-11293
>
> *Romi Kuntsman*, *Big Data Engineer*
> http://www.totango.com
>
> On Wed, Nov 18, 2015 at 2:00 PM, Tom Arnfeld <t...@duedil.com> wrote:
>
>> Hey,
>>
>> I’m wondering if anyone has run into issues with Spark 1.5 and a
>> FileNotFound exception with shuffle.index files? It’s been cropping up with
>> very large joins and aggregations, and causing all of our jobs to fail
>> towards the end. The memory limit for the executors (we’re running on
>> mesos) is touching 60GB+ with ~10 cores per executor, which is way
>> oversubscribed.
>>
>> We’re running spark inside containers, and have configured
>> “spark.executorEnv.SPARK_LOCAL_DIRS” to a special directory path in the
>> container for performance/disk reasons, and since then the issue started to
>> arise. I’m wondering if there’s a bug with the way spark looks for shuffle
>> files, and one of the implementations isn’t obeying the path properly?
>>
>> I don’t want to set "spark.local.dir” because that requires the driver
>> also have this directory set up, which is not the case.
>>
>> Has anyone seen this issue before?
>>
>> 
>>
>> 15/11/18 11:32:27 ERROR storage.ShuffleBlockFetcherIterator: Failed to
>> get block(s) from XXX:50777
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /mnt/mesos/sandbox/spark-tmp/blockmgr-7a5b85c8-7c5f-43c2-b9e2-2bc0ffdb902d/23/shuffle_2_94_0.index
>> (No such file or directory)
>>at java.io.FileInputStream.open(Native Method)
>>at java.io.FileInputStream.(FileInputStream.java:146)
>>at
>> org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:98)
>>at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:300)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>>at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>>at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>>at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>>at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChann

Re: Some spark apps fail with "All masters are unresponsive", while others pass normally

2015-11-09 Thread Romi Kuntsman
If they have a problem managing memory, wouldn't there should be a OOM?
Why does AppClient throw a NPE?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Nov 9, 2015 at 4:59 PM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Is that all you have in the executor logs? I suspect some of those jobs
> are having a hard time managing  the memory.
>
> Thanks
> Best Regards
>
> On Sun, Nov 1, 2015 at 9:38 PM, Romi Kuntsman <r...@totango.com> wrote:
>
>> [adding dev list since it's probably a bug, but i'm not sure how to
>> reproduce so I can open a bug about it]
>>
>> Hi,
>>
>> I have a standalone Spark 1.4.0 cluster with 100s of applications running
>> every day.
>>
>> From time to time, the applications crash with the following error (see
>> below)
>> But at the same time (and also after that), other applications are
>> running, so I can safely assume the master and workers are working.
>>
>> 1. why is there a NullPointerException? (i can't track the scala stack
>> trace to the code, but anyway NPE is usually a obvious bug even if there's
>> actually a network error...)
>> 2. why can't it connect to the master? (if it's a network timeout, how to
>> increase it? i see the values are hardcoded inside AppClient)
>> 3. how to recover from this error?
>>
>>
>>   ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application
>> has been killed. Reason: All masters are unresponsive! Giving up. ERROR
>>   ERROR 01-11 15:32:55,087  OneForOneStrategy - ERROR
>> logs/error.log
>>   java.lang.NullPointerException NullPointerException
>>   at
>> org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
>>   at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>   at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>   at
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>   at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
>>   at
>> org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
>>   at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>   at
>> org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>   at
>> org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>   at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>   at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>   at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>   ERROR 01-11 15:32:55,603   SparkContext - Error
>> initializing SparkContext. ERROR
>>   java.lang.IllegalStateException: Cannot call methods on a stopped
>> SparkContext
>>   at org.apache.spark.SparkContext.org
>> $apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
>>   at
>> org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
>>   at
>> org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
>>   at org.apache.spark.SparkContext.(SparkContext.scala:543)
>>   at
>> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
>>
>>
>> Thanks!
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>
>


Re: JMX with Spark

2015-11-05 Thread Romi Kuntsman
Have you read this?
https://spark.apache.org/docs/latest/monitoring.html

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Nov 5, 2015 at 2:08 PM, Yogesh Vyas <informy...@gmail.com> wrote:

> Hi,
> How we can use JMX and JConsole to monitor our Spark applications?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Romi Kuntsman
I noticed that toJavaRDD causes a computation on the DataFrame, so is it
considered an action, even though logically it's a transformation?
On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" 
wrote:

> Hello folks,
>
> Recently I have noticed unexpectedly big network traffic between Driver
> Program and Worker node.
> During debugging I have figured out that it is caused by following block
> of code
>
> —— Java ——— —
> DataFrame etpvRecords = context.sql(" SOME SQL query here");
> Mapper m = new Mapper(localValue, ProgramId::toProgId);
> return etpvRecords
> .toJavaRDD()
> .map(m::mapHutPutViewingRow)
> .reduce(Reducer::reduce);
> —— Java 
>
> I’m using debug breakpoint and OS X nettop to monitor traffic between
> processes. So before approaching line toJavaRDD() I have 500Kb of traffic
> and after executing this line I have 2.2 Mb of traffic. But when I check
> size of result of reduce function it is 10 Kb.
> So .toJavaRDD() seems causing worker process return dataset to driver
> process and seems further map/reduce occurs on Driver.
>
> This is definitely not expected by me, so I have 2 questions.
> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole
> dataset return to driver or I’m doing something wrong?
> 2.  What is expected way to perform transformation with DataFrame using
> custom Java map\reduce functions in case if standard SQL features are not
> fit all my needs?
>
> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same
> machine). Java 1.8.0_60.
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>


Re: DataFrame.toJavaRDD cause fetching data to driver, is it expected ?

2015-11-04 Thread Romi Kuntsman
In my program I move between RDD and DataFrame several times.
I know that the entire data of the DF doesn't go into the driver because it
wouldn't fit there.
But calling toJavaRDD does cause computation.

Check the number of partitions you have on the DF and RDD...
On Nov 4, 2015 7:54 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com>
wrote:

> Hello Romi,
>
> Do you mean that in my particular case I’m causing computation on
> dataFrame or it is regular behavior of DataFrame.toJavaRDD ?
> If it’s regular behavior, do you know which approach could be used to
> perform make/reduce on dataFrame without causing it to load all data to
> driver program ?
>
> On Nov 4, 2015, at 12:34 PM, Romi Kuntsman <r...@totango.com> wrote:
>
> I noticed that toJavaRDD causes a computation on the DataFrame, so is it
> considered an action, even though logically it's a transformation?
> On Nov 4, 2015 6:51 PM, "Aliaksei Tsyvunchyk" <atsyvunc...@exadel.com>
> wrote:
>
>> Hello folks,
>>
>> Recently I have noticed unexpectedly big network traffic between Driver
>> Program and Worker node.
>> During debugging I have figured out that it is caused by following block
>> of code
>>
>> —— Java ——— —
>> DataFrame etpvRecords = context.sql(" SOME SQL query here");
>> Mapper m = new Mapper(localValue, ProgramId::toProgId);
>> return etpvRecords
>> .toJavaRDD()
>> .map(m::mapHutPutViewingRow)
>> .reduce(Reducer::reduce);
>> —— Java 
>>
>> I’m using debug breakpoint and OS X nettop to monitor traffic between
>> processes. So before approaching line toJavaRDD() I have 500Kb of traffic
>> and after executing this line I have 2.2 Mb of traffic. But when I check
>> size of result of reduce function it is 10 Kb.
>> So .toJavaRDD() seems causing worker process return dataset to driver
>> process and seems further map/reduce occurs on Driver.
>>
>> This is definitely not expected by me, so I have 2 questions.
>> 1.  Is it really expected behavior that DataFrame.toJavaRDD cause whole
>> dataset return to driver or I’m doing something wrong?
>> 2.  What is expected way to perform transformation with DataFrame using
>> custom Java map\reduce functions in case if standard SQL features are not
>> fit all my needs?
>>
>> Env: Spark 1.5.1 in standalone mode, (1 master, 1 worker sharing same
>> machine). Java 1.8.0_60.
>>
>> CONFIDENTIALITY NOTICE: This email and files attached to it are
>> confidential. If you are not the intended recipient you are hereby notified
>> that using, copying, distributing or taking any action in reliance on the
>> contents of this information is strictly prohibited. If you have received
>> this email in error please notify the sender and delete this email.
>>
>
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>


Re: Error : - No filesystem for scheme: spark

2015-11-02 Thread Romi Kuntsman
except "spark.master", do you have "spark://" anywhere in your code or
config files?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Nov 2, 2015 at 11:27 AM, Balachandar R.A. <balachandar...@gmail.com>
wrote:

>
> -- Forwarded message --
> From: "Balachandar R.A." <balachandar...@gmail.com>
> Date: 02-Nov-2015 12:53 pm
> Subject: Re: Error : - No filesystem for scheme: spark
> To: "Jean-Baptiste Onofré" <j...@nanthrax.net>
> Cc:
>
> > HI JB,
> > Thanks for the response,
> > Here is the content of my spark-defaults.conf
> >
> >
> > # Default system properties included when running spark-submit.
> > # This is useful for setting default environmental settings.
> >
> > # Example:
> >  spark.master spark://fdoat:7077
> > # spark.eventLog.enabled   true
> >  spark.eventLog.dir/home/bala/spark-logs
> > # spark.eventLog.dir   hdfs://namenode:8021/directory
> > # spark.serializer
> org.apache.spark.serializer.KryoSerializer
> > # spark.driver.memory  5g
> > # spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value
> -Dnumbers="one two three"
> >
> >
> > regards
> > Bala
>
> >
> > On 2 November 2015 at 12:21, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >>
> >> Hi,
> >>
> >> do you have something special in conf/spark-defaults.conf (especially
> on the eventLog directory) ?
> >>
> >> Regards
> >> JB
> >>
> >>
> >> On 11/02/2015 07:48 AM, Balachandar R.A. wrote:
> >>>
> >>> Can someone tell me at what point this error could come?
> >>>
> >>> In one of my use cases, I am trying to use hadoop custom input format.
> >>> Here is my code.
> >>>
> >>> |valhConf:Configuration=sc.hadoopConfiguration
> >>>
> hConf.set("fs.hdfs.impl",classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)hConf.set("fs.file.impl",classOf[org.apache.hadoop.fs.LocalFileSystem].getName)varjob
> >>>
> =newJob(hConf)FileInputFormat.setInputPaths(job,newPath("hdfs:///user/bala/MyBinaryFile"));varhRDD
> >>>
> =newNewHadoopRDD(sc,classOf[RandomAccessInputFormat],classOf[IntWritable],classOf[BytesWritable],job.getConfiguration())valcount
> >>>
> =hRDD.mapPartitionsWithInputSplit{(split,iter)=>myfuncPart(split,iter)}|
> >>>
> >>> |The moment I invoke mapPartitionsWithInputSplit() method, I get the
> >>> below error in my spark-submit launch|
> >>>
> >>> |
> >>> |
> >>>
> >>> |15/10/3011:11:39WARN scheduler.TaskSetManager:Losttask 0.0in stage
> >>> 0.0(TID
> 0,40.221.94.235):java.io.IOException:NoFileSystemforscheme:spark
> >>> at
> >>>
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)at
> >>>
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)at
> >>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)|
> >>>
> >>> Any help here to move towards fixing this will be of great help
> >>>
> >>>
> >>>
> >>> Thanks
> >>>
> >>> Bala
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> >> -
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
>
>


Some spark apps fail with "All masters are unresponsive", while others pass normally

2015-11-01 Thread Romi Kuntsman
[adding dev list since it's probably a bug, but i'm not sure how to
reproduce so I can open a bug about it]

Hi,

I have a standalone Spark 1.4.0 cluster with 100s of applications running
every day.

>From time to time, the applications crash with the following error (see
below)
But at the same time (and also after that), other applications are running,
so I can safely assume the master and workers are working.

1. why is there a NullPointerException? (i can't track the scala stack
trace to the code, but anyway NPE is usually a obvious bug even if there's
actually a network error...)
2. why can't it connect to the master? (if it's a network timeout, how to
increase it? i see the values are hardcoded inside AppClient)
3. how to recover from this error?


  ERROR 01-11 15:32:54,991SparkDeploySchedulerBackend - Application has
been killed. Reason: All masters are unresponsive! Giving up. ERROR
  ERROR 01-11 15:32:55,087  OneForOneStrategy - ERROR
logs/error.log
  java.lang.NullPointerException NullPointerException
  at
org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
  at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
  at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
  at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
  at
org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  ERROR 01-11 15:32:55,603   SparkContext - Error
initializing SparkContext. ERROR
  java.lang.IllegalStateException: Cannot call methods on a stopped
SparkContext
  at org.apache.spark.SparkContext.org
$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
  at
org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
  at
org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
  at org.apache.spark.SparkContext.(SparkContext.scala:543)
  at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)


Thanks!

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com


Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Romi Kuntsman
Did you try to cache a DataFrame with just a single row?
Do you rows have any columns with null values?
Can you post a code snippet here on how you load/generate the dataframe?
Does dataframe.rdd.cache work?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
wrote:

> It is not a problem to use JavaRDD.cache() for 200M data (all Objects read
> form Json Format). But when I try to use DataFrame.cache(), It shown
> exception in below.
>
> My machine can cache 1 G data in Avro format without any problem.
>
> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>
> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
> 27.832369 ms
>
> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID
> 1)
>
> java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
> SQLContext.scala:500)
>
> at
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
> SQLContext.scala:500)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>
> at scala.collection.TraversableLike$$anonfun$map$1.apply(
> TraversableLike.scala:244)
>
> at scala.collection.IndexedSeqOptimized$class.foreach(
> IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
> SQLContext.scala:500)
>
> at org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(
> SQLContext.scala:498)
>
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
> InMemoryColumnarTableScan.scala:127)
>
> at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(
> InMemoryColumnarTableScan.scala:120)
>
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278
> )
>
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
>
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38
> )
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38
> )
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 15/10/29 13:26:23 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> localhost): java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>
>
> Thanks,
>
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


Re: NullPointerException when cache DataFrame in Java (Spark1.5.1)

2015-10-29 Thread Romi Kuntsman
>
> BUT, after change limit(500) to limit(1000). The code report
> NullPointerException.
>
I had a similar situation, and the problem was with a certain record.
Try to find which records are returned when you limit to 1000 but not
returned when you limit to 500.

Could it be a NPE thrown from PixelObject?
Are you running spark with master=local, so it's running inside your IDE
and you can see the errors from the driver and worker?


*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Thu, Oct 29, 2015 at 10:04 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
wrote:

> Thanks Romi,
>
> I resize the dataset to 7MB, however, the code show NullPointerException
>  exception as well.
>
> Did you try to cache a DataFrame with just a single row?
>
> Yes, I tried. But, Same problem.
> .
> Do you rows have any columns with null values?
>
> No, I had filter out null values before cache the dataframe.
>
> Can you post a code snippet here on how you load/generate the dataframe?
>
> Sure, Here is the working code 1:
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator()).cache();
>
> System.out.println(pixels.count()); // 3000-4000 rows
>
> Working code 2:
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator());
>
> DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.
> class);
>
> DataFrame totalDF1 = 
> schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
> is not null").limit(500);
>
> System.out.println(totalDF1.count());
>
>
> BUT, after change limit(500) to limit(1000). The code report
> NullPointerException.
>
>
> JavaRDD pixels = pixelsStr.map(new PixelGenerator());
>
> DataFrame schemaPixel = sqlContext.createDataFrame(pixels, PixelObject.
> class);
>
> DataFrame totalDF = 
> schemaPixel.select(schemaPixel.col("domain")).filter("'domain'
> is not null").limit(*1000*);
>
> System.out.println(totalDF.count()); // problem at this line
>
> 15/10/29 18:56:28 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
>
> 15/10/29 18:56:28 INFO TaskSchedulerImpl: Cancelling stage 0
>
> 15/10/29 18:56:28 INFO DAGScheduler: ShuffleMapStage 0 (count at
> X.java:113) failed in 3.764 s
>
> 15/10/29 18:56:28 INFO DAGScheduler: Job 0 failed: count at XXX.java:113,
> took 3.862207 s
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0, localhost): java.lang.NullPointerException
>
> at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
> Does dataframe.rdd.cache work?
>
> No, I tried but same exception.
>
> Thanks,
>
> Jingyu
>
> On 29 October 2015 at 17:38, Romi Kuntsman <r...@totango.com> wrote:
>
>> Did you try to cache a DataFrame with just a single row?
>> Do you rows have any columns with null values?
>> Can you post a code snippet here on how you load/generate the dataframe?
>> Does dataframe.rdd.cache work?
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Thu, Oct 29, 2015 at 4:33 AM, Zhang, Jingyu <jingyu.zh...@news.com.au>
>> wrote:
>>
>>> It is not a problem to use JavaRDD.cache() for 200M data (all Objects
>>> read form Json Format). But when I try to use DataFrame.cache(), It shown
>>> exception in below.
>>>
>>> My machine can cache 1 G data in Avro format without any problem.
>>>
>>> 15/10/29 13:26:23 INFO GeneratePredicate: Code generated in 154.531827 ms
>>>
>>> 15/10/29 13:26:23 INFO GenerateUnsafeProjection: Code generated in
>>> 27.832369 ms
>>>
>>> 15/10/29 13:26:23 ERROR Executor: Exception in task 0.0 in stage 1.0
>>> (TID 1)
>>>
>>> java.lang.NullPointerException
>>>
>>> at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>> DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at
>>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>>> SQLContext.scala:500)
>>>
>>> at
>>> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.apply(
>>> SQLContext.scala:500)
>>>
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>>> TraversableLike.scala:244)
>>>
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>>> Tra

Re: How to get a new RDD by ordinarily subtract its adjacent rows

2015-09-21 Thread Romi Kuntsman
RDD is a set of data rows (in your case numbers), there is no meaning for
the order of the items.
What exactly are you trying to accomplish?

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:29 PM, Zhiliang Zhu <zchl.j...@yahoo.com.invalid>
wrote:

> Dear ,
>
> I have took lots of days to think into this issue, however, without any
> success...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, I would like get a new RDD rdd2, each row
> in rdd2[ i ] = rdd1[ i ] - rdd[i - 1] .
> What kinds of API or function would I use...
>
>
> Thanks very much!
> John
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
sparkConext is available on the driver, not on executors.

To read from Cassandra, you can use something like this:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 2:27 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> can i use this sparkContext on executors ??
> In my application, i have scenario of reading from db for certain records
> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>
> If sparkContext couldn't be sent to executors , what is the workaround for
> this ??
>
> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>
>> add @transient?
>>
>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Hello All,
>>>
>>> How can i pass sparkContext as a parameter to a method in an object.
>>> Because passing sparkContext is giving me TaskNotSerializable Exception.
>>>
>>> How can i achieve this ?
>>>
>>> Thanks,
>>> Padma Ch
>>>
>>
>>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
foreach is something that runs on the driver, not the workers.

if you want to perform some function on each record from cassandra, you
need to do cassandraRdd.map(func), which will run distributed on the spark
workers

*Romi Kuntsman*, *Big Data Engineer*
http://www.totango.com

On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch <learnings.chitt...@gmail.com>
wrote:

> Yes, but i need to read from cassandra db within a spark
> transformation..something like..
>
> dstream.forachRDD{
>
> rdd=> rdd.foreach {
>  message =>
>  sc.cassandraTable()
>   .
>   .
>   .
> }
> }
>
> Since rdd.foreach gets executed on workers, how can i make sparkContext
> available on workers ???
>
> Regards,
> Padma Ch
>
> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> You can use broadcast variable for passing connection information.
>>
>> Cheers
>>
>> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>> can i use this sparkContext on executors ??
>> In my application, i have scenario of reading from db for certain records
>> in rdd. Hence I need sparkContext to read from DB (cassandra in our case),
>>
>> If sparkContext couldn't be sent to executors , what is the workaround
>> for this ??
>>
>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>>
>>> add @transient?
>>>
>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <learnings.chitt...@gmail.com
>>> > wrote:
>>>
>>>> Hello All,
>>>>
>>>> How can i pass sparkContext as a parameter to a method in an
>>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>>> Exception.
>>>>
>>>> How can i achieve this ?
>>>>
>>>> Thanks,
>>>> Padma Ch
>>>>
>>>
>>>
>>
>


Re: how to get RDD from two different RDDs with cross column

2015-09-21 Thread Romi Kuntsman
Hi,
If I understand correctly:
rdd1 contains keys (of type StringDate)
rdd2 contains keys and values
and rdd3 contains all the keys, and the values from rdd2?

I think you should make rdd1 and rdd2 PairRDD, and then use outer join.
Does that make sense?

On Mon, Sep 21, 2015 at 8:37 PM Zhiliang Zhu  wrote:

> Dear Romi, Priya, Sujt and Shivaram and all,
>
> I have took lots of days to think into this issue, however, without  any
> enough good solution...
> I shall appreciate your all kind help.
>
> There is an RDD rdd1, and another RDD rdd2,
> (rdd2 can be PairRDD, or DataFrame with two columns as ).
> StringDate column values from rdd1 and rdd2 are cross but not the same.
>
> I would like to get a new RDD rdd3, StringDate in rdd3
> would be all from (same) as rdd1, and float in rdd3 would be from rdd2 if
> its
> StringDate is in rdd2, or else NULL would be assigned.
> each row in rdd3[ i ] = ,
> rdd2[i].StringDate would be same as rdd1[ i ].StringDate,
> then rdd2[ i ].float is assigned rdd3[ i ] StringDate part.
> What kinds of API or function would I use...
>
> Thanks very much!
> Zhiliang
>
>
>


Re: passing SparkContext as parameter

2015-09-21 Thread Romi Kuntsman
Cody, that's a great reference!
As shown there - the best way to connect to an external database from the
workers is to create a connection pool on (each) worker.
The driver mass pass, via broadcast, the connection string, but not the
connect object itself and not the spark context.

On Mon, Sep 21, 2015 at 5:31 PM Cody Koeninger <c...@koeninger.org> wrote:

> That isn't accurate, I think you're confused about foreach.
>
> Look at
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
>
> On Mon, Sep 21, 2015 at 7:36 AM, Romi Kuntsman <r...@totango.com> wrote:
>
>> foreach is something that runs on the driver, not the workers.
>>
>> if you want to perform some function on each record from cassandra, you
>> need to do cassandraRdd.map(func), which will run distributed on the spark
>> workers
>>
>> *Romi Kuntsman*, *Big Data Engineer*
>> http://www.totango.com
>>
>> On Mon, Sep 21, 2015 at 3:29 PM, Priya Ch <learnings.chitt...@gmail.com>
>> wrote:
>>
>>> Yes, but i need to read from cassandra db within a spark
>>> transformation..something like..
>>>
>>> dstream.forachRDD{
>>>
>>> rdd=> rdd.foreach {
>>>  message =>
>>>  sc.cassandraTable()
>>>   .
>>>   .
>>>   .
>>> }
>>> }
>>>
>>> Since rdd.foreach gets executed on workers, how can i make sparkContext
>>> available on workers ???
>>>
>>> Regards,
>>> Padma Ch
>>>
>>> On Mon, Sep 21, 2015 at 5:10 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> You can use broadcast variable for passing connection information.
>>>>
>>>> Cheers
>>>>
>>>> On Sep 21, 2015, at 4:27 AM, Priya Ch <learnings.chitt...@gmail.com>
>>>> wrote:
>>>>
>>>> can i use this sparkContext on executors ??
>>>> In my application, i have scenario of reading from db for certain
>>>> records in rdd. Hence I need sparkContext to read from DB (cassandra in our
>>>> case),
>>>>
>>>> If sparkContext couldn't be sent to executors , what is the workaround
>>>> for this ??
>>>>
>>>> On Mon, Sep 21, 2015 at 3:06 PM, Petr Novak <oss.mli...@gmail.com>
>>>> wrote:
>>>>
>>>>> add @transient?
>>>>>
>>>>> On Mon, Sep 21, 2015 at 11:27 AM, Priya Ch <
>>>>> learnings.chitt...@gmail.com> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> How can i pass sparkContext as a parameter to a method in an
>>>>>> object. Because passing sparkContext is giving me TaskNotSerializable
>>>>>> Exception.
>>>>>>
>>>>>> How can i achieve this ?
>>>>>>
>>>>>> Thanks,
>>>>>> Padma Ch
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


How to determine the value for spark.sql.shuffle.partitions?

2015-09-01 Thread Romi Kuntsman
Hi all,

The number of partition greatly affect the speed and efficiency of
calculation, in my case in DataFrames/SparkSQL on Spark 1.4.0.

Too few partitions with large data cause OOM exceptions.
Too many partitions on small data cause a delay due to overhead.

How do you programmatically determine the optimal number of partitions and
cores in Spark, as a function of:

   1. available memory per core
   2. number of records in input data
   3. average/maximum record size
   4. cache configuration
   5. shuffle configuration
   6. serialization
   7. etc?

Any general best practices?

Thanks!

Romi K.


Re: How to remove worker node but let it finish first?

2015-08-29 Thread Romi Kuntsman
It's only available in Mesos?
I'm using spark standalone cluster, is there anything about it there?

On Fri, Aug 28, 2015 at 8:51 AM Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can create a custom mesos framework for your requirement, to get you
 started you can check this out
 http://mesos.apache.org/documentation/latest/app-framework-development-guide/

 Thanks
 Best Regards

 On Mon, Aug 24, 2015 at 12:11 PM, Romi Kuntsman r...@totango.com wrote:

 Hi,
 I have a spark standalone cluster with 100s of applications per day, and
 it changes size (more or less workers) at various hours. The driver runs on
 a separate machine outside the spark cluster.

 When a job is running and it's worker is killed (because at that hour the
 number of workers is reduced), it sometimes fails, instead of
 redistributing the work to other workers.

 How is it possible to decomission a worker, so that it doesn't receive
 any new work, but does finish all existing work before shutting down?

 Thanks!





Re: Exception when S3 path contains colons

2015-08-25 Thread Romi Kuntsman
Hello,

We had the same problem. I've written a blog post with the detailed
explanation and workaround:

http://labs.totango.com/spark-read-file-with-colon/

Greetings,
Romi K.

On Tue, Aug 25, 2015 at 2:47 PM Gourav Sengupta gourav.sengu...@gmail.com
wrote:

 I am not quite sure about this but should the notation not be 
 s3n://redactedbucketname/*
 instead of
 s3a://redactedbucketname/*

 The best way is to use s3://bucketname/path/*


 Regards,
 Gourav

 On Tue, Aug 25, 2015 at 10:35 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can change the names, whatever program that is pushing the record
 must follow the naming conventions. Try to replace : with _ or something.

 Thanks
 Best Regards

 On Tue, Aug 18, 2015 at 10:20 AM, Brian Stempin brian.stem...@gmail.com
 wrote:

 Hi,
 I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0).  I'm seeing
 the exception below when encountering file names that contain colons.  Any
 idea on how to get around this?

 scala val files = sc.textFile(s3a://redactedbucketname/*)

 2015-08-18 04:38:34,567 INFO  [main] storage.MemoryStore
 (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with
 curMem=669367, maxMem=285203496

 2015-08-18 04:38:34,568 INFO  [main] storage.MemoryStore
 (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory
 (estimated size 236.5 KB, free 271.1 MB)

 2015-08-18 04:38:34,663 INFO  [main] storage.MemoryStore
 (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with
 curMem=911591, maxMem=285203496

 2015-08-18 04:38:34,664 INFO  [main] storage.MemoryStore
 (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in
 memory (estimated size 21.0 KB, free 271.1 MB)

 2015-08-18 04:38:34,665 INFO
  [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo
 (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on
 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB)

 2015-08-18 04:38:34,667 INFO  [main] spark.SparkContext
 (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at
 console:21

 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at
 textFile at console:21


 scala files.count

 2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
 (S3AFileSystem.java:listStatus(533)) - List status for path:
 s3a://redactedbucketname/

 2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
 (S3AFileSystem.java:getFileStatus(684)) - Getting path status for
 s3a://redactedbucketname/ ()

 java.lang.IllegalArgumentException: java.net.URISyntaxException:
 Relative path in absolute URI:
 [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv

 at org.apache.hadoop.fs.Path.initialize(Path.java:206)

 at org.apache.hadoop.fs.Path.init(Path.java:172)

 at org.apache.hadoop.fs.Path.init(Path.java:94)

 at org.apache.hadoop.fs.Globber.glob(Globber.java:240)

 at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)

 at
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229)

 at
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200)

 at
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279)

 at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)

 at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)

 at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

 at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)

 at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)

 at scala.Option.getOrElse(Option.scala:120)

 at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)

 at org.apache.spark.rdd.RDD.count(RDD.scala:1099)

 at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24)

 at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29)

 at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31)

 at $iwC$iwC$iwC$iwC$iwC.init(console:33)

 at $iwC$iwC$iwC$iwC.init(console:35)

 at $iwC$iwC$iwC.init(console:37)

 at $iwC$iwC.init(console:39)

 at $iwC.init(console:41)

 at init(console:43)

 at .init(console:47)

 at .clinit(console)

 at .init(console:7)

 at .clinit(console)

 at $print(console)

 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)

 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)

 at 

How to remove worker node but let it finish first?

2015-08-24 Thread Romi Kuntsman
Hi,
I have a spark standalone cluster with 100s of applications per day, and it
changes size (more or less workers) at various hours. The driver runs on a
separate machine outside the spark cluster.

When a job is running and it's worker is killed (because at that hour the
number of workers is reduced), it sometimes fails, instead of
redistributing the work to other workers.

How is it possible to decomission a worker, so that it doesn't receive any
new work, but does finish all existing work before shutting down?

Thanks!


Re: How to overwrite partition when writing Parquet?

2015-08-20 Thread Romi Kuntsman
Cheng - what if I want to overwrite a specific partition?

I'll to remove the folder, as Hemant suggested...

On Thu, Aug 20, 2015 at 1:17 PM Cheng Lian lian.cs@gmail.com wrote:

 You can apply a filter first to filter out data of needed dates and then
 append them.


 Cheng


 On 8/20/15 4:59 PM, Hemant Bhanawat wrote:

 How can I overwrite only a given partition or manually remove a partition
 before writing?

 I don't know if (and I don't think)  there is a way to do that using a
 mode. But doesn't manually deleting the directory of a particular partition
 help? For directory structure, check this out...


 http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


 On Wed, Aug 19, 2015 at 8:18 PM, Romi Kuntsman r...@totango.com wrote:

 Hello,

 I have a DataFrame, with a date column which I want to use as a partition.
 Each day I want to write the data for the same date in Parquet, and then
 read a dataframe for a date range.

 I'm using:

 myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir);

 If I use SaveMode.Append, then writing data for the same partition adds
 the same data there again.
 If I use SaveMode.Overwrite, then writing data for a single partition
 removes all the data for all partitions.

 How can I overwrite only a given partition or manually remove a partition
 before writing?

 Many thanks!
 Romi K.






How to overwrite partition when writing Parquet?

2015-08-19 Thread Romi Kuntsman
Hello,

I have a DataFrame, with a date column which I want to use as a partition.
Each day I want to write the data for the same date in Parquet, and then
read a dataframe for a date range.

I'm using:
myDataframe.write().partitionBy(date).mode(SaveMode.Overwrite).parquet(parquetDir);

If I use SaveMode.Append, then writing data for the same partition adds the
same data there again.
If I use SaveMode.Overwrite, then writing data for a single partition
removes all the data for all partitions.

How can I overwrite only a given partition or manually remove a partition
before writing?

Many thanks!
Romi K.


Re: How to minimize shuffling on Spark dataframe Join?

2015-08-19 Thread Romi Kuntsman
If you create a PairRDD from the DataFrame, using
dataFrame.toRDD().mapToPair(), then you can call
partitionBy(someCustomPartitioner) which will partition the RDD by the key
(of the pair).
Then the operations on it (like joining with another RDD) will consider
this partitioning.
I'm not sure that DataFrames already support this.

On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar 
abdullah.ibn.an...@gmail.com wrote:

 Hi Hemant,

 Thank you for your replay.

 I think source of my dataframe is not partitioned on key, its an avro
 file where 'id' is a field .. but I don't know how to read a file and at
 the same time configure partition key. I couldn't find  anything on
 SQLContext.read.load where you can set partition key. or in dataframe where
 you can set partition key. If it could partition the on the specified key
 .. will spark put the same partition range on same machine for two
 different dataframe??

What are the overall tips to join faster?

 Best Regards,
 Abdullah




 On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Is the source of your dataframe partitioned on key? As per your mail, it
 looks like it is not. If that is the case,  for partitioning the data, you
 will have to shuffle the data anyway.

 Another part of your question is - how to co-group data from two
 dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is
 a way. I am not sure if something similar is available for DataFrames.

 Hemant





 On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
 abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I
 read a dataframe from input. And If it is possible that Spark would
 understand that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah





 --
 Abdullah



Re: Issues with S3 paths that contain colons

2015-08-19 Thread Romi Kuntsman
I had the exact same issue, and overcame it by overriding
NativeS3FileSystem with my own class, where I replaced the implementation
of globStatus. It's a hack but it works.
Then I set the hadoop config fs.myschema.impl to my class name, and
accessed the files through myschema:// instead of s3n://

@Override
public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter)
throws IOException {
  final FileStatus[] statusList = super.listStatus(pathPattern);
  final ListFileStatus result = Lists.newLinkedList();
  for (FileStatus fileStatus : statusList) {
if (filter.accept(fileStatus.getPath())) {
  result.add(fileStatus);
}
  }
  return result.toArray(new FileStatus[] {});
}



On Wed, Aug 19, 2015 at 9:14 PM Steve Loughran ste...@hortonworks.com
wrote:

 you might want to think about filing a JIRA on issues.apache.org against
 HADOOP here, component being fs/s3. That doesn't mean it is fixable, only
 known.

 Every FS has its own set of forbidden characters  filenames; unix doesn't
 files named .; windows doesn't allow files called COM1, ..., so hitting
 some filesystem rule is sometimes a problem. Here, though, you've got the
 file in S3, the listing finds it, but other bits of the codepath are
 failing -which implies that it is something in the Hadoop libs.


  On 18 Aug 2015, at 08:20, Brian Stempin brian.stem...@gmail.com wrote:
 
  Hi,
  I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0).  I'm seeing
 the
  exception below when encountering file names that contain colons.  Any
 idea
  on how to get around this?
 
  scala val files = sc.textFile(s3a://redactedbucketname/*)
 
  2015-08-18 04:38:34,567 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with
  curMem=669367, maxMem=285203496
 
  2015-08-18 04:38:34,568 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in
 memory
  (estimated size 236.5 KB, free 271.1 MB)
 
  2015-08-18 04:38:34,663 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with
  curMem=911591, maxMem=285203496
 
  2015-08-18 04:38:34,664 INFO  [main] storage.MemoryStore
  (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in
  memory (estimated size 21.0 KB, free 271.1 MB)
 
  2015-08-18 04:38:34,665 INFO
 [sparkDriver-akka.actor.default-dispatcher-19]
  storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added
  broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB,
 free:
  271.9 MB)
 
  2015-08-18 04:38:34,667 INFO  [main] spark.SparkContext
  (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at
  console:21
 
  files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at
 textFile at
  console:21
 
 
  scala files.count
 
  2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
  (S3AFileSystem.java:listStatus(533)) - List status for path:
  s3a://redactedbucketname/
 
  2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
  (S3AFileSystem.java:getFileStatus(684)) - Getting path status for
  s3a://redactedbucketname/ ()
 
  java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative
  path in absolute URI:
 
 [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv
 
  at org.apache.hadoop.fs.Path.initialize(Path.java:206)
 
  at org.apache.hadoop.fs.Path.init(Path.java:172)
 
  at org.apache.hadoop.fs.Path.init(Path.java:94)
 
  at org.apache.hadoop.fs.Globber.glob(Globber.java:240)
 
  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)
 
  at
 
 org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229)
 
  at
 
 org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200)
 
  at
 
 org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279)
 
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
 
  at scala.Option.getOrElse(Option.scala:120)
 
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 
  at
 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)
 
  at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)
 
  at scala.Option.getOrElse(Option.scala:120)
 
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
 
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)
 
  at org.apache.spark.rdd.RDD.count(RDD.scala:1099)
 
  at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24)
 
  at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29)
 
  at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31)
 
  at $iwC$iwC$iwC$iwC$iwC.init(console:33)
 
  at $iwC$iwC$iwC$iwC.init(console:35)
 
  at $iwC$iwC$iwC.init(console:37)
 
  at 

Re: spark as a lookup engine for dedup

2015-07-27 Thread Romi Kuntsman
RDD is immutable, it cannot be changed, you can only create a new one from
data or from transformation. It sounds inefficient to create one each 15
sec for the last 24 hours.
I think a key-value store will be much more fitted for this purpose.

On Mon, Jul 27, 2015 at 11:21 AM Shushant Arora shushantaror...@gmail.com
wrote:

 its for 1 day events in range of 1 billions and processing is in streaming
 application of ~10-15 sec interval so lookup should be fast.  RDD need to
 be updated with new events and old events of current time-24 hours back
 should be removed at each processing.

 So is spark RDD not fit for this requirement?

 On Mon, Jul 27, 2015 at 1:08 PM, Romi Kuntsman r...@totango.com wrote:

 What the throughput of processing and for how long do you need to
 remember duplicates?

 You can take all the events, put them in an RDD, group by the key, and
 then process each key only once.
 But if you have a long running application where you want to check that
 you didn't see the same value before, and check that for every value, you
 probably need a key-value store, not RDD.

 On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com
 wrote:

 Hi

 I have a requirement for processing large events but ignoring duplicate
 at the same time.

 Events are consumed from kafka and each event has a eventid. It may
 happen that an event is already processed and came again at some other
 offset.

 1.Can I use Spark RDD to persist processed events and then lookup with
 this rdd (How to do lookup inside a RDD ?I have a
 JavaPairRDDeventid,timestamp )
 while processing new events and if event is present in  persisted rdd
 ignore it , else process the even. Does rdd.lookup(key) on billion of
 events will be efficient ?

 2. update the rdd (Since RDD is immutable  how to update it)?

 Thanks





Re: spark as a lookup engine for dedup

2015-07-27 Thread Romi Kuntsman
What the throughput of processing and for how long do you need to remember
duplicates?

You can take all the events, put them in an RDD, group by the key, and then
process each key only once.
But if you have a long running application where you want to check that you
didn't see the same value before, and check that for every value, you
probably need a key-value store, not RDD.

On Sun, Jul 26, 2015 at 7:38 PM Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 I have a requirement for processing large events but ignoring duplicate at
 the same time.

 Events are consumed from kafka and each event has a eventid. It may happen
 that an event is already processed and came again at some other offset.

 1.Can I use Spark RDD to persist processed events and then lookup with
 this rdd (How to do lookup inside a RDD ?I have a
 JavaPairRDDeventid,timestamp )
 while processing new events and if event is present in  persisted rdd
 ignore it , else process the even. Does rdd.lookup(key) on billion of
 events will be efficient ?

 2. update the rdd (Since RDD is immutable  how to update it)?

 Thanks




Re: Scaling spark cluster for a running application

2015-07-22 Thread Romi Kuntsman
Are you running the Spark cluster in standalone or YARN?
In standalone, the application gets the available resources when it starts.
With YARN, you can try to turn on the setting
*spark.dynamicAllocation.enabled*
See https://spark.apache.org/docs/latest/configuration.html

On Wed, Jul 22, 2015 at 2:20 PM phagunbaya phagun.b...@falkonry.com wrote:

 I have a spark cluster running in client mode with driver outside the spark
 cluster. I want to scale the cluster after an application is submitted. In
 order to do this, I'm creating new workers and they are getting registered
 with master but issue I'm seeing is; running application does not use the
 newly added worker. Hence cannot add more resources to existing running
 application.

 Is there any other way or config to deal with this use-case ? How to make
 running application to ask for executors from newly issued worker node ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Applications metrics unseparatable from Master metrics?

2015-07-22 Thread Romi Kuntsman
Hi,

I tried to enable Master metrics source (to get number of running/waiting
applications etc), and connected it to Graphite.
However, when these are enabled, application metrics are also sent.

Is it possible to separate them, and send only master metrics without
applications?

I see that Master class is registering both:
https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91

Thanks,
RK.


Re: Timestamp functions for sqlContext

2015-07-21 Thread Romi Kuntsman
Hi Tal,

I'm not sure there is currently a built-in function for it, but you can
easily define a UDF (user defined function) by extending
org.apache.spark.sql.api.java.UDF1, registering it
(sparkContext.udf().register(...)), and then use it inside your query.

RK.



On Tue, Jul 21, 2015 at 7:04 PM Tal Rozen t...@scaleka.com wrote:

 Hi,

 I'm running a query with sql context where one of the fields is of type
 java.sql.Timestamp. I'd like to set a function similar to DATEDIFF in
 mysql, between the date given in each row, and now. So If I was able to use
 the same syntax as in mysql it would be:

 val date_diff_df = sqlContext.sql(select DATEDIFF(curdate(),
 rowTimestamp) date_diff from tableName)

 What are the relevant key words to replace curdate(), and DATEDIFF?

 Thanks








Spark Application stuck retrying task failed on Java heap space?

2015-07-21 Thread Romi Kuntsman
Hello,

*TL;DR: task crashes with OOM, but application gets stuck in infinite loop
retrying the task over and over again instead of failing fast.*

Using Spark 1.4.0, standalone, with DataFrames on Java 7.
I have an application that does some aggregations. I played around with
shuffling settings, which led to the dreaded Java heap space error. See the
stack trace at the end of this message.

When this happens, I see 10's of executors in EXITED state, a couple in
LOADING and one in RUNNING. All of them are retrying the same task all
over again, and keep failing with the same Java heap space error. This
goes on for hours!

Why doesn't the whole application fail, when the individual executors keep
failing with the same error?

Thanks,
Romi K.

---

end of the log in a failed task:

15/07/21 11:13:40 INFO executor.Executor: Finished task 117.0 in stage
218.1 (TID 305). 2000 bytes result sent to driver
15/07/21 11:13:41 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 306
15/07/21 11:13:41 INFO executor.Executor: Running task 0.0 in stage 219.1
(TID 306)
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Updating epoch to 420
and clearing cache
15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Started reading
broadcast variable 8
15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(5463) called
with curMem=285917, maxMem=1406164008
15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8_piece0 stored
as bytes in memory (estimated size 5.3 KB, free 1340.7 MB)
15/07/21 11:13:41 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 8 took 22 ms
15/07/21 11:13:41 INFO storage.MemoryStore: ensureFreeSpace(10880) called
with curMem=291380, maxMem=1406164008
15/07/21 11:13:41 INFO storage.MemoryStore: Block broadcast_8 stored as
values in memory (estimated size 10.6 KB, free 1340.7 MB)
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Don't have map outputs
for shuffle 136, fetching them
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint = AkkaRpcEndpointRef(Actor[akka.tcp://
sparkDriver@1.2.3.4:57490/user/MapOutputTracker#-99712578])
15/07/21 11:13:41 INFO spark.MapOutputTrackerWorker: Got the output
locations
15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Getting 182
non-empty blocks out of 182 blocks
15/07/21 11:13:41 INFO storage.ShuffleBlockFetcherIterator: Started 4
remote fetches in 28 ms
15/07/21 11:14:34 ERROR executor.Executor: Exception in task 0.0 in stage
219.1 (TID 306)
java.lang.OutOfMemoryError: Java heap space
at
scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99)
at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47)
at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83)
at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:192)
at
org.apache.spark.sql.execution.Sort$$anonfun$doExecute$5$$anonfun$apply$5.apply(basicOperators.scala:190)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at

Re: Guava 11 dependency issue in Spark 1.2.0

2015-01-19 Thread Romi Kuntsman
I have recently encountered a similar problem with Guava version collision
with Hadoop.

Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are
they staying in version 11, does anyone know?

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com
wrote:

 Hi Sean,

 I removed the hadoop dependencies from the app and ran it on the cluster.
 It gives a java.io.EOFException

 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with
 curMem=0, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in
 memory (estimated size 173.0 KB, free 1911.2 MB)
 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with
 curMem=177166, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as
 bytes in memory (estimated size 24.9 KB, free 1911.1 MB)
 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in
 memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB)
 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
 broadcast_0_piece0
 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile
 at AvroRelation.scala:45
 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1
 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at
 SparkPlan.scala:84
 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at
 SparkPlan.scala:84) with 2 output partitions (allowLocal=false)
 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at
 SparkPlan.scala:84)
 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List()
 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List()
 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at
 map at SparkPlan.scala:84), which has no missing parents
 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with
 curMem=202668, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in
 memory (estimated size 4.8 KB, free 1911.1 MB)
 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with
 curMem=207532, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as
 bytes in memory (estimated size 3.4 KB, free 1911.1 MB)
 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in
 memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB)
 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
 broadcast_1_piece0
 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast at
 DAGScheduler.scala:838
 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from Stage
 0 (MappedRDD[6] at map at SparkPlan.scala:84)
 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
 10.100.5.109): java.io.EOFException
 at
 java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722)
 at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009)
 at
 org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
 at
 org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
 at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
 at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
 at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
 at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
 at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
 at
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
 at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1871)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1327)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1969)
 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893

Re: Guava 11 dependency issue in Spark 1.2.0

2015-01-19 Thread Romi Kuntsman
Actually there is already someone on Hadoop-Common-Dev taking care of
removing the old Guava dependency

http://mail-archives.apache.org/mod_mbox/hadoop-common-dev/201501.mbox/browser
https://issues.apache.org/jira/browse/HADOOP-11470

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Mon, Jan 19, 2015 at 4:03 PM, Romi Kuntsman r...@totango.com wrote:

 I have recently encountered a similar problem with Guava version collision
 with Hadoop.

 Isn't it more correct to upgrade Hadoop to use the latest Guava? Why are
 they staying in version 11, does anyone know?

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Wed, Jan 7, 2015 at 7:59 AM, Niranda Perera niranda.per...@gmail.com
 wrote:

 Hi Sean,

 I removed the hadoop dependencies from the app and ran it on the cluster.
 It gives a java.io.EOFException

 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(177166) called with
 curMem=0, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0 stored as values in
 memory (estimated size 173.0 KB, free 1911.2 MB)
 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(25502) called with
 curMem=177166, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_0_piece0 stored as
 bytes in memory (estimated size 24.9 KB, free 1911.1 MB)
 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in
 memory on 10.100.5.109:43924 (size: 24.9 KB, free: 1911.3 MB)
 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
 broadcast_0_piece0
 15/01/07 11:19:29 INFO SparkContext: Created broadcast 0 from hadoopFile
 at AvroRelation.scala:45
 15/01/07 11:19:29 INFO FileInputFormat: Total input paths to process : 1
 15/01/07 11:19:29 INFO SparkContext: Starting job: collect at
 SparkPlan.scala:84
 15/01/07 11:19:29 INFO DAGScheduler: Got job 0 (collect at
 SparkPlan.scala:84) with 2 output partitions (allowLocal=false)
 15/01/07 11:19:29 INFO DAGScheduler: Final stage: Stage 0(collect at
 SparkPlan.scala:84)
 15/01/07 11:19:29 INFO DAGScheduler: Parents of final stage: List()
 15/01/07 11:19:29 INFO DAGScheduler: Missing parents: List()
 15/01/07 11:19:29 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at
 map at SparkPlan.scala:84), which has no missing parents
 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(4864) called with
 curMem=202668, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1 stored as values in
 memory (estimated size 4.8 KB, free 1911.1 MB)
 15/01/07 11:19:29 INFO MemoryStore: ensureFreeSpace(3481) called with
 curMem=207532, maxMem=2004174766
 15/01/07 11:19:29 INFO MemoryStore: Block broadcast_1_piece0 stored as
 bytes in memory (estimated size 3.4 KB, free 1911.1 MB)
 15/01/07 11:19:29 INFO BlockManagerInfo: Added broadcast_1_piece0 in
 memory on 10.100.5.109:43924 (size: 3.4 KB, free: 1911.3 MB)
 15/01/07 11:19:29 INFO BlockManagerMaster: Updated info of block
 broadcast_1_piece0
 15/01/07 11:19:29 INFO SparkContext: Created broadcast 1 from broadcast
 at DAGScheduler.scala:838
 15/01/07 11:19:29 INFO DAGScheduler: Submitting 2 missing tasks from
 Stage 0 (MappedRDD[6] at map at SparkPlan.scala:84)
 15/01/07 11:19:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
 15/01/07 11:19:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0
 (TID 0, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
 15/01/07 11:19:29 INFO TaskSetManager: Starting task 1.0 in stage 0.0
 (TID 1, 10.100.5.109, PROCESS_LOCAL, 1340 bytes)
 15/01/07 11:19:29 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
 10.100.5.109): java.io.EOFException
 at
 java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2722)
 at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1009)
 at
 org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
 at
 org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
 at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
 at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
 at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
 at
 org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
 at
 org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
 at
 org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
 at
 org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
 at
 java.io.ObjectInputStream.readSerialData

Re: Announcing Spark 1.1.1!

2014-12-03 Thread Romi Kuntsman
About version compatibility and upgrade path -  can the Java application
dependencies and the Spark server be upgraded separately (i.e. will 1.1.0
library work with 1.1.1 server, and vice versa), or do they need to be
upgraded together?

Thanks!

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Dec 2, 2014 at 11:36 PM, Andrew Or and...@databricks.com wrote:

 I am happy to announce the availability of Spark 1.1.1! This is a
 maintenance release with many bug fixes, most of which are concentrated in
 the core. This list includes various fixes to sort-based shuffle, memory
 leak, and spilling issues. Contributions from this release came from 55
 developers.

 Visit the release notes [1] to read about the new features, or
 download [2] the release today.

 [1] http://spark.apache.org/releases/spark-release-1-1-1.html
 [2] http://spark.apache.org/downloads.html

 Please e-mail me directly for any typo's in the release notes or name
 listing.

 Thanks for everyone who contributed, and congratulations!
 -Andrew



ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly

2014-11-26 Thread Romi Kuntsman
Hello,

I have a large data calculation in Spark, distributed across serveral
nodes. In the end, I want to write to a single output file.

For this I do:
   output.coalesce(1, false).saveAsTextFile(filename).

What happens is all the data from the workers flows to a single worker, and
that one writes the data.
If the data is small enough, it all goes well.
However, for a RDD from a certain size, I get a lot of the following
messages (see below).

From what I understand, ExternalAppendOnlyMap spills the data to disk when
it can't hold it in memory.
Is there a way to tell it to stream the data right to disk, instead of
spilling each block slowly?

14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 22 ms
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 70 non-empty blocks out of 90 blocks
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (1 time so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (2 times so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 2 ms
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 15 MB to disk (1 time so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 16 MB to disk (2 times so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 14 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (33 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (34 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (35 times so far)
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (1 time so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (2 times so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 9 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (36 times so far)
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 11 MB to disk (37 times so far)
14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task
'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


ExternalAppendOnlyMap: Thread spilling in-memory map of to disk many times slowly

2014-11-24 Thread Romi Kuntsman
Hello,

I have a large data calculation in Spark, distributed across serveral
nodes. In the end, I want to write to a single output file.

For this I do:
   output.coalesce(1, false).saveAsTextFile(filename).

What happens is all the data from the workers flows to a single worker, and
that one writes the data.
If the data is small enough, it all goes well.
However, for a RDD from a certain size, I get a lot of the following
messages (see below).

From what I understand, ExternalAppendOnlyMap spills the data to disk when
it can't hold it in memory.
Is there a way to tell it to stream the data right to disk, instead of
spilling each block slowly?

14/11/24 12:54:59 INFO MapOutputTrackerWorker: Got the output locations
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 12:54:59 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 22 ms
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 70 non-empty blocks out of 90 blocks
14/11/24 12:55:11 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (1 time so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (2 times so far)
14/11/24 12:55:11 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:28 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 2 ms
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 15 MB to disk (1 time so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 16 MB to disk (2 times so far)
14/11/24 13:13:28 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 14 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (33 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (34 times so far)
14/11/24 13:13:32 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 13 MB to disk (35 times so far)
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, targetRequestSize: 10066329
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 69 non-empty blocks out of 90 blocks
14/11/24 13:13:40 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 3 remote fetches in 4 ms
14/11/24 13:13:40 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (1 time so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 10 MB to disk (2 times so far)
14/11/24 13:13:41 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 9 MB to disk (3 times so far)
[...trimmed...]
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 12 MB to disk (36 times so far)
14/11/24 13:13:45 INFO ExternalAppendOnlyMap: Thread 64 spilling in-memory
map of 11 MB to disk (37 times so far)
14/11/24 13:13:56 INFO FileOutputCommitter: Saved output of task
'attempt_201411241250__m_00_90' to s3n://mybucket/mydir/output

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


Re: Spark job resource allocation best practices

2014-11-04 Thread Romi Kuntsman
How can I configure Mesos allocation policy to share resources between all
current Spark applications? I can't seem to find it in the architecture
docs.

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Yes. i believe Mesos is the right choice for you.
 http://mesos.apache.org/documentation/latest/mesos-architecture/

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com wrote:

 So, as said there, static partitioning is used in Spark’s standalone and
 YARN modes, as well as the coarse-grained Mesos mode.
 That leaves us only with Mesos, where there is *dynamic sharing* of CPU
 cores.

 It says when the application is not running tasks on a machine, other
 applications may run tasks on those cores.
 But my applications are short lived (seconds to minutes), and they read a
 large dataset, process it, and write the results. They are also IO-bound,
 meaning most of the time is spent reading input data (from S3) and writing
 the results back.

 Is it possible to divide the resources between them, according to how
 many are trying to run at the same time?
 So for example if I have 12 cores - if one job is scheduled, it will get
 12 cores, but if 3 are scheduled, then each one will get 4 cores and then
 will all start.

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Have a look at scheduling pools
 https://spark.apache.org/docs/latest/job-scheduling.html. If you want
 more sophisticated resource allocation, then you are better of to use
 cluster managers like mesos or yarn

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote:

 Hello,

 I have a Spark 1.1.0 standalone cluster, with several nodes, and
 several jobs (applications) being scheduled at the same time.
 By default, each Spark job takes up all available CPUs.
 This way, when more than one job is scheduled, all but the first are
 stuck in WAITING.
 On the other hand, if I tell each job to initially limit itself to a
 fixed number of CPUs, and that job runs by itself, the cluster is
 under-utilized and the job runs longer than it could have if it took all
 the available resources.

 - How to give the tasks a more fair resource division, which lets many
 jobs run together, and together lets them use all the available resources?
 - How do you divide resources between applications on your usecase?

 P.S. I started reading about Mesos but couldn't figure out if/how it
 could solve the described issue.

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com







Re: Spark job resource allocation best practices

2014-11-04 Thread Romi Kuntsman
I have a single Spark cluster, not multiple frameworks and not multiple
versions. Is it relevant for my use-case?
Where can I find information about exactly how to make Mesos tell Spark how
many resources of the cluster to use? (instead of the default take-all)

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You can look at different modes over here
 http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes

 These people has very good tutorial to get you started
 http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview

 Thanks
 Best Regards

 On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman r...@totango.com wrote:

 How can I configure Mesos allocation policy to share resources between
 all current Spark applications? I can't seem to find it in the architecture
 docs.

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yes. i believe Mesos is the right choice for you.
 http://mesos.apache.org/documentation/latest/mesos-architecture/

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com wrote:

 So, as said there, static partitioning is used in Spark’s standalone
 and YARN modes, as well as the coarse-grained Mesos mode.
 That leaves us only with Mesos, where there is *dynamic sharing* of
 CPU cores.

 It says when the application is not running tasks on a machine, other
 applications may run tasks on those cores.
 But my applications are short lived (seconds to minutes), and they read
 a large dataset, process it, and write the results. They are also IO-bound,
 meaning most of the time is spent reading input data (from S3) and writing
 the results back.

 Is it possible to divide the resources between them, according to how
 many are trying to run at the same time?
 So for example if I have 12 cores - if one job is scheduled, it will
 get 12 cores, but if 3 are scheduled, then each one will get 4 cores and
 then will all start.

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Have a look at scheduling pools
 https://spark.apache.org/docs/latest/job-scheduling.html. If you
 want more sophisticated resource allocation, then you are better of to use
 cluster managers like mesos or yarn

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com
 wrote:

 Hello,

 I have a Spark 1.1.0 standalone cluster, with several nodes, and
 several jobs (applications) being scheduled at the same time.
 By default, each Spark job takes up all available CPUs.
 This way, when more than one job is scheduled, all but the first are
 stuck in WAITING.
 On the other hand, if I tell each job to initially limit itself to a
 fixed number of CPUs, and that job runs by itself, the cluster is
 under-utilized and the job runs longer than it could have if it took all
 the available resources.

 - How to give the tasks a more fair resource division, which lets
 many jobs run together, and together lets them use all the available
 resources?
 - How do you divide resources between applications on your usecase?

 P.S. I started reading about Mesos but couldn't figure out if/how it
 could solve the described issue.

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com









Re: Spark job resource allocation best practices

2014-11-04 Thread Romi Kuntsman
Let's say that I run Spark on Mesos in fine-grained mode, and I have 12
cores and 64GB memory.
I run application A on Spark, and some time after that (but before A
finished) application B.
How many CPUs will each of them get?

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Tue, Nov 4, 2014 at 11:33 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 You need to install mesos on your cluster. Then you will run your spark
 applications by specifying mesos master (mesos://) instead of (spark://).

 Spark can run over Mesos in two modes: “*fine-grained*” (default) and “
 *coarse-grained*”.

 In “*fine-grained*” mode (default), each Spark task runs as a separate
 Mesos task. This allows multiple instances of Spark (and other frameworks)
 to share machines at a very fine granularity, where each application gets
 more or fewer machines as it ramps up and down, but it comes with an
 additional overhead in launching each task. This mode may be inappropriate
 for low-latency requirements like interactive queries or serving web
 requests.

 The “*coarse-grained*” mode will instead launch only one long-running
 Spark task on each Mesos machine, and dynamically schedule its own
 “mini-tasks” within it. The benefit is much lower startup overhead, but at
 the cost of reserving the Mesos resources for the complete duration of the
 application.

 To run in coarse-grained mode, set the spark.mesos.coarse property in your
 SparkConf:
  conf.set(spark.mesos.coarse, true)


 In addition, for coarse-grained mode, you can control the maximum number
 of resources Spark will acquire. By default, it will acquire all cores in
 the cluster (that get offered by Mesos), which only makes sense if you run
 just one application at a time. You can cap the maximum number of cores
 using conf.set(spark.cores.max, 10) (for example).


 If you run your application in fine-grained mode, then mesos will take
 care of the resource allocation for you. You just tell mesos from your
 application that you are running in fine-grained mode, and it is the
 default mode.

 Thanks
 Best Regards

 On Tue, Nov 4, 2014 at 2:46 PM, Romi Kuntsman r...@totango.com wrote:

 I have a single Spark cluster, not multiple frameworks and not multiple
 versions. Is it relevant for my use-case?
 Where can I find information about exactly how to make Mesos tell Spark
 how many resources of the cluster to use? (instead of the default take-all)

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Tue, Nov 4, 2014 at 11:00 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You can look at different modes over here
 http://docs.sigmoidanalytics.com/index.php/Spark_On_Mesos#Mesos_Run_Modes

 These people has very good tutorial to get you started
 http://mesosphere.com/docs/tutorials/run-spark-on-mesos/#overview

 Thanks
 Best Regards

 On Tue, Nov 4, 2014 at 1:44 PM, Romi Kuntsman r...@totango.com wrote:

 How can I configure Mesos allocation policy to share resources between
 all current Spark applications? I can't seem to find it in the architecture
 docs.

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Tue, Nov 4, 2014 at 9:11 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Yes. i believe Mesos is the right choice for you.
 http://mesos.apache.org/documentation/latest/mesos-architecture/

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:33 PM, Romi Kuntsman r...@totango.com
 wrote:

 So, as said there, static partitioning is used in Spark’s standalone
 and YARN modes, as well as the coarse-grained Mesos mode.
 That leaves us only with Mesos, where there is *dynamic sharing* of
 CPU cores.

 It says when the application is not running tasks on a machine,
 other applications may run tasks on those cores.
 But my applications are short lived (seconds to minutes), and they
 read a large dataset, process it, and write the results. They are also
 IO-bound, meaning most of the time is spent reading input data (from S3)
 and writing the results back.

 Is it possible to divide the resources between them, according to how
 many are trying to run at the same time?
 So for example if I have 12 cores - if one job is scheduled, it will
 get 12 cores, but if 3 are scheduled, then each one will get 4 cores and
 then will all start.

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com

 On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Have a look at scheduling pools
 https://spark.apache.org/docs/latest/job-scheduling.html. If you
 want more sophisticated resource allocation, then you are better of to 
 use
 cluster managers like mesos or yarn

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com
 wrote:

 Hello,

 I have a Spark 1.1.0 standalone cluster, with several nodes, and
 several jobs (applications) being scheduled at the same time.
 By default, each Spark job takes up all available CPUs.
 This way, when more than one job is scheduled, all

Spark job resource allocation best practices

2014-11-03 Thread Romi Kuntsman
Hello,

I have a Spark 1.1.0 standalone cluster, with several nodes, and several
jobs (applications) being scheduled at the same time.
By default, each Spark job takes up all available CPUs.
This way, when more than one job is scheduled, all but the first are stuck
in WAITING.
On the other hand, if I tell each job to initially limit itself to a fixed
number of CPUs, and that job runs by itself, the cluster is under-utilized
and the job runs longer than it could have if it took all the available
resources.

- How to give the tasks a more fair resource division, which lets many jobs
run together, and together lets them use all the available resources?
- How do you divide resources between applications on your usecase?

P.S. I started reading about Mesos but couldn't figure out if/how it could
solve the described issue.

Thanks!

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com


Re: Spark job resource allocation best practices

2014-11-03 Thread Romi Kuntsman
So, as said there, static partitioning is used in Spark’s standalone and
YARN modes, as well as the coarse-grained Mesos mode.
That leaves us only with Mesos, where there is *dynamic sharing* of CPU
cores.

It says when the application is not running tasks on a machine, other
applications may run tasks on those cores.
But my applications are short lived (seconds to minutes), and they read a
large dataset, process it, and write the results. They are also IO-bound,
meaning most of the time is spent reading input data (from S3) and writing
the results back.

Is it possible to divide the resources between them, according to how many
are trying to run at the same time?
So for example if I have 12 cores - if one job is scheduled, it will get 12
cores, but if 3 are scheduled, then each one will get 4 cores and then will
all start.

Thanks!

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Mon, Nov 3, 2014 at 5:46 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Have a look at scheduling pools
 https://spark.apache.org/docs/latest/job-scheduling.html. If you want
 more sophisticated resource allocation, then you are better of to use
 cluster managers like mesos or yarn

 Thanks
 Best Regards

 On Mon, Nov 3, 2014 at 9:10 PM, Romi Kuntsman r...@totango.com wrote:

 Hello,

 I have a Spark 1.1.0 standalone cluster, with several nodes, and several
 jobs (applications) being scheduled at the same time.
 By default, each Spark job takes up all available CPUs.
 This way, when more than one job is scheduled, all but the first are
 stuck in WAITING.
 On the other hand, if I tell each job to initially limit itself to a
 fixed number of CPUs, and that job runs by itself, the cluster is
 under-utilized and the job runs longer than it could have if it took all
 the available resources.

 - How to give the tasks a more fair resource division, which lets many
 jobs run together, and together lets them use all the available resources?
 - How do you divide resources between applications on your usecase?

 P.S. I started reading about Mesos but couldn't figure out if/how it
 could solve the described issue.

 Thanks!

 *Romi Kuntsman*, *Big Data Engineer*
  http://www.totango.com





Re: Dynamically switching Nr of allocated core

2014-11-03 Thread Romi Kuntsman
I didn't notice your message and asked about the same question, in the
thread with the title Spark job resource allocation best practices.

Adding specific case to your example:
1 - There are 12 cores available in the cluster
2 - I start app B with all cores - gets 12
3 - I start app A - it needs just 2 cores (as you said it will get even
when there are 12 available), but gets nothing
4 - Until I stop app B, app A is stuck waiting, instead of app B freeing 2
cores and dropping to 10 cores.

*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com

On Mon, Nov 3, 2014 at 3:17 PM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Hi all,

 I can't seem to find a clear answer on the documentation.

 Does the standalone cluster support dynamic assigment of nr of allocated
 cores to an application once another app stops?
 I'm aware that we can have core sharding if we use Mesos between active
 applications depending on the nr of parallel tasks I believe my question is
 slightly simpler.

 For example:
 1 - There are 12 cores available in the cluster
 2 - I start app A with 2 cores - gets 2
 3 - I start app B - gets remaining 10
 4 - If I stop app A, app B *does not* get the now available remaining 2
 cores.

 Should I expect Mesos to have this scenario working?

 Also, the same question applies to when we add more cores to a cluster.
 Let's say ideally I want 12 cores for my app, although there are only 10.
 As
 I add more workers, they should get assigned to my app dynamically. I
 haven't tested this in a while but I think the app will not even start and
 complain about not enough resources...

 Would very much appreciate any knowledge share on this!

 tnks,
 Rod






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Dynamically-switching-Nr-of-allocated-core-tp17955.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Workers disconnected from master sometimes and never reconnect back

2014-09-29 Thread Romi Kuntsman
Hi all,

Regarding a post here a few months ago
http://apache-spark-user-list.1001560.n3.nabble.com/Workers-disconnected-from-master-sometimes-and-never-reconnect-back-tp6240.html

Is there an answer to this?
I saw workers being still active and not reconnecting after they lost
connection to the master. Using Spark 1.1.0.

What if a master server is restarted, should worker retry to register on it?

Greetings,

-- 
*Romi Kuntsman*, *Big Data Engineer*
 http://www.totango.com
​Join the Customer Success Manifesto  http://youtu.be/XvFi2Wh6wgU