Re: Maximum Core Utilization

2015-05-05 Thread Richard Marscher
Hi,

do you have information on how many partitions/tasks the stage/job is
running? By default there is 1 core per task, and your number of concurrent
tasks may be limiting core utilization.

There are a few settings you could play with, assuming your issue is
related to the above:
spark.default.parallelism
spark.cores.max
spark.task.cpus

On Tue, May 5, 2015 at 3:55 PM, Manu Kaul  wrote:

> Hi All,
> For a job I am running on Spark with a dataset of say 350,000 lines (not
> big), I am finding that even though my cluster has a large number of cores
> available (like 100 cores), the Spark system seems to stop after using just
> 4 cores and after that the runtime is pretty much a straight line no matter
> how many more cores are thrown at it. I am wondering if Spark tries to
> figure out the maximum no. of cores to use based on the size of the
> dataset? If yes, is there a way to disable this feature and force it to use
> all the cores available?
>
> Thanks,
> Manu
>
> --
>
> The greater danger for most of us lies not in setting our aim too high and
> falling short; but in setting our aim too low, and achieving our mark.
> - Michelangelo
>


Re: Spark Job triggers second attempt

2015-05-07 Thread Richard Marscher
Hi,

I think you may want to use this setting?:

spark.task.maxFailures4Number of individual task failures before giving up
on the job. Should be greater than or equal to 1. Number of allowed retries
= this value - 1.

On Thu, May 7, 2015 at 2:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> How i can stop Spark to stop triggering second attempt in case the first
> fails.
> I do not want to wait for the second attempt to fail again so that i can
> debug faster.
>
> .set("spark.yarn.maxAppAttempts", "0") OR .set("spark.yarn.maxAppAttempts",
> "1")
>
> is not helping.
>
> --
> Deepak
>
>


Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Richard Marscher
By default you would expect to find the logs files for master and workers
in the relative `logs` directory from the root of the Spark installation on
each of the respective nodes in the cluster.

On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  Ø  Can you check your local and remote logs?
>
>
>
> Where are the log files? I see the following in my Driver program logs as
> well as the Spark UI failed task page
>
>
>
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_2_piece0 of broadcast_2
>
>
>
> Here is the detailed stack trace.
>
> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
> org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0 of broadcast_2
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>
>
> Ningjun
>
>
>
> *From:* Jonathan Coveney [mailto:jcove...@gmail.com]
> *Sent:* Wednesday, May 06, 2015 5:23 PM
> *To:* Wang, Ningjun (LNG-NPV)
> *Cc:* Ted Yu; user@spark.apache.org
>
> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0
>
>
>
> Can you check your local and remote logs?
>
>
>
> 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com>:
>
> This problem happen in Spark 1.3.1.  It happen when two jobs are running
> simultaneously each in its own Spark Context.
>
>
>
> I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
> introduced in Spark 1.3.1?
>
>
>
> Ningjun
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Wednesday, May 06, 2015 11:32 AM
> *To:* Wang, Ningjun (LNG-NPV)
> *Cc:* user@spark.apache.org
> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0
>
>
>
> Which release of Spark are you using ?
>
>
>
> Thanks
>
>
> On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>  I run a job on spark standalone cluster and got the exception below
>
>
>
> Here is the line of code that cause problem
>
>
>
> *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
> cattegory, path) *
>
>
> myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)
>
> *val *cats: Array[String] = myRdd.map(t => t._2).distinct().collect()  //
> This line cause the exception
>
>
>
>
>
> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
> org.apache.spark.SparkException:
> Failed to get broadcast_2_piece0 of broadcast_2
>
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_2_piece0 of broadcast_2
>
> at
> org.apache.spark.broadcast.T

Re: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_2_piece0

2015-05-07 Thread Richard Marscher
I should also add I've recently seen this issue as well when using collect.
I believe in my case it was related to heap space on the driver program not
being able to handle the returned collection.

On Thu, May 7, 2015 at 11:05 AM, Richard Marscher 
wrote:

> By default you would expect to find the logs files for master and workers
> in the relative `logs` directory from the root of the Spark installation on
> each of the respective nodes in the cluster.
>
> On Thu, May 7, 2015 at 10:27 AM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>>  Ø  Can you check your local and remote logs?
>>
>>
>>
>> Where are the log files? I see the following in my Driver program logs as
>> well as the Spark UI failed task page
>>
>>
>>
>> java.io.IOException: org.apache.spark.SparkException: Failed to get
>> broadcast_2_piece0 of broadcast_2
>>
>>
>>
>> Here is the detailed stack trace.
>>
>> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
>> org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0 of broadcast_2
>>
>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1156)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
>>
>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:61)
>>
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>>
>>
>>
>> Ningjun
>>
>>
>>
>> *From:* Jonathan Coveney [mailto:jcove...@gmail.com]
>> *Sent:* Wednesday, May 06, 2015 5:23 PM
>> *To:* Wang, Ningjun (LNG-NPV)
>> *Cc:* Ted Yu; user@spark.apache.org
>>
>> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0
>>
>>
>>
>> Can you check your local and remote logs?
>>
>>
>>
>> 2015-05-06 16:24 GMT-04:00 Wang, Ningjun (LNG-NPV) <
>> ningjun.w...@lexisnexis.com>:
>>
>> This problem happen in Spark 1.3.1.  It happen when two jobs are running
>> simultaneously each in its own Spark Context.
>>
>>
>>
>> I don’t remember seeing this bug in Spark 1.2.0. Is it a new bug
>> introduced in Spark 1.3.1?
>>
>>
>>
>> Ningjun
>>
>>
>>
>> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
>> *Sent:* Wednesday, May 06, 2015 11:32 AM
>> *To:* Wang, Ningjun (LNG-NPV)
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: java.io.IOException: org.apache.spark.SparkException:
>> Failed to get broadcast_2_piece0
>>
>>
>>
>> Which release of Spark are you using ?
>>
>>
>>
>> Thanks
>>
>>
>> On May 6, 2015, at 8:03 AM, Wang, Ningjun (LNG-NPV) <
>> ningjun.w...@lexisnexis.com> wrote:
>>
>>  I run a job on spark standalone cluster and got the exception below
>>
>>
>>
>> Here is the line of code that cause problem
>>
>>
>>
>> *val *myRdd: RDD[(String, String, String)] = … *// RDD of (docid,
>> cattegory, path) *
>>
>>
>> myRdd.persist(StorageLevel.*MEMORY_AND_DISK_SER*)
>>
>> *val *cats: Array[String] = myRdd.map(t => t._2).distinct().collect()
>> // This line cause the exception
>>
>>
>>
>>
>>
>> 15/05/06 10:48:51 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> LAB4-WIN03.pcc.lexisnexis.com): java.io.IOException: 
>> org.apache.spark.SparkException:
>> Failed t

Re: Looking inside the 'mapPartitions' transformation, some confused observations

2015-05-11 Thread Richard Marscher
I believe the issue in b and c is that you call iter.size which actually is
going to flush the iterator so the subsequent attempt to put it into a
vector will yield 0 items. You could use an ArrayBuilder for example and
not need to rely on knowing the size of the iterator.

On Mon, May 11, 2015 at 2:26 PM, myasuka  wrote:

> As we all know, a partition in Spark is actually an Iterator[T]. For some
> purpose, I want to treat each partition not an Iterator but one whole
> object. For example, treat Iterator[Int] to a
> breeze.linalg.DenseVector[Int]. Thus I use 'mapPartitions' API to achieve
> this, however, during the implementation, I observed some confused
> observations.
> I use Spark 1.3.0 on 10 executor nodes cluster, below is different
> attempts:
>
> /import breeze.linalg.DenseVector/
>
> val a = sc.parallelize( 1 to 100, 10)
>
> val b = a.mapPartitions(iter =>{val v = Array.ofDim[Int](iter.size)
> var ind = 0
> while(iter.hasNext){
>  v(ind) = iter.next
>  ind += 1
> }
> println(v.mkString(","))
> Iterator.single[DenseVector[Int]](DenseVector(v))}
> )
> b.count()
>
> val c = a.mapPartitions(iter =>{val v = Array.ofDim[Int](iter.size)
> iter.copyToArray(v, 0, 10)
> println(v.mkString(","))
> Iterator.single[DenseVector[Int]](DenseVector(v))}
> )
> c.count()
>
> val d = a.mapPartitions(iter =>{val v = iter.toArray
> println(v.mkString(","))
> Iterator.single[DenseVector[Int]](DenseVector(v))}
> )
> d.count()
>
> I can see the printed output in the executor's stdout, actually only
> attempt
> 'd' satisfy my needs, and other attempts only get a zero desevector, which
> means the variable assignment from iterator to vector did not happen.
>
> Hope for explanations for these observations.
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Looking-inside-the-mapPartitions-transformation-some-confused-observations-tp22850.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-15 Thread Richard Marscher
The doc is a bit confusing IMO, but at least for my application I had to
use a fair pool configuration to get my stages to be scheduled with FAIR.

On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov  wrote:

> No pools for the moment – for each of the apps using the straightforward
> way with the spark conf param for scheduling = FAIR
>
>
>
> Spark is running in a Standalone Mode
>
>
>
> Are you saying that Configuring Pools is mandatory to get the FAIR
> scheduling working – from the docs it seemed optional to me
>
>
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Friday, May 15, 2015 6:45 PM
> *To:* Evo Eftimov
> *Cc:* user
> *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
>
>
>
> How are you configuring the fair scheduler pools?
>
>
>
> On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov 
> wrote:
>
> I have run / submitted a few Spark Streaming apps configured with Fair
> scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode.
> Is FAIR scheduling supported at all for Spark Streaming apps and from what
> release / version - e.g. 1.3.1
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond

2015-05-15 Thread Richard Marscher
It's not a Spark Streaming app, so sorry I'm not sure of the answer to
that. I would assume it should work.

On Fri, May 15, 2015 at 2:22 PM, Evo Eftimov  wrote:

> Ok thanks a lot for clarifying that – btw was your application a Spark
> Streaming App – I am also looking for confirmation that FAIR scheduling is
> supported for Spark Streaming Apps
>
>
>
> *From:* Richard Marscher [mailto:rmarsc...@localytics.com]
> *Sent:* Friday, May 15, 2015 7:20 PM
> *To:* Evo Eftimov
> *Cc:* Tathagata Das; user
> *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
>
>
>
> The doc is a bit confusing IMO, but at least for my application I had to
> use a fair pool configuration to get my stages to be scheduled with FAIR.
>
>
>
> On Fri, May 15, 2015 at 2:13 PM, Evo Eftimov 
> wrote:
>
> No pools for the moment – for each of the apps using the straightforward
> way with the spark conf param for scheduling = FAIR
>
>
>
> Spark is running in a Standalone Mode
>
>
>
> Are you saying that Configuring Pools is mandatory to get the FAIR
> scheduling working – from the docs it seemed optional to me
>
>
>
> *From:* Tathagata Das [mailto:t...@databricks.com]
> *Sent:* Friday, May 15, 2015 6:45 PM
> *To:* Evo Eftimov
> *Cc:* user
> *Subject:* Re: Spark Fair Scheduler for Spark Streaming - 1.2 and beyond
>
>
>
> How are you configuring the fair scheduler pools?
>
>
>
> On Fri, May 15, 2015 at 8:33 AM, Evo Eftimov 
> wrote:
>
> I have run / submitted a few Spark Streaming apps configured with Fair
> scheduling on Spark Streaming 1.2.0, however they still run in a FIFO mode.
> Is FAIR scheduling supported at all for Spark Streaming apps and from what
> release / version - e.g. 1.3.1
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Fair-Scheduler-for-Spark-Streaming-1-2-and-beyond-tp22902.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


Event Logging to HDFS on Standalone Cluster "In Progress"

2015-06-01 Thread Richard Marscher
Hi,

In Spark 1.3.0 I've enabled event logging to write to an existing HDFS
folder on a Standalone cluster. This is generally working, all the logs are
being written. However, from the Master Web UI, the vast majority of
completed applications are labeled as not having a history:
http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.&title=Application%20history%20not%20found%20(app-20150601160846-1914)

The log does exists though:

# hdfs dfs -ls -R /eventLogs/app-20150601160846-1914

-rw-rw   3 user group1027848 2015-06-01 16:09
/eventLogs/app-20150601160846-1914

and `cat` the file ends with:

{"Event":"SparkListenerApplicationEnd","Timestamp":1433174955077}

This seems to indicate it saw and logged the application end.

Is there a known issue here or a workaround? Looking at the source code I
might have expected these files to end in `.inprogress` given the UI error
message, but they don't.

Thanks,
Richard


Re: Event Logging to HDFS on Standalone Cluster "In Progress"

2015-06-01 Thread Richard Marscher
It looks like it is possibly a race condition between removing the
IN_PROGRESS and building the history UI for the application.

`AppClient` sends an `UnregisterApplication(appId)` message to the `Master`
actor, which triggers the process to look for the app's eventLogs. If they
are suffixed with `.inprogress` then it will not build out the history UI
and instead build the error page I've seen.

Tying this together, calling SparkContext.stop() has the following block:


if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null } if
(_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false }
_eventLogger.foreach(_.stop())
Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend
which has an AppClient. AppClient sends itself a message to stop itself,
and like mentioned above, it then sends a message to the Master where it
tries to build the history UI.

Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix is
removed in the file-system. It seems like the race condition of the Akka
message passing to trigger the Master's building of the history UI may be
the only reason the history UI ever gets properly setup in the first place.
Because if the ordering of calls were all strict in the SparkContext.stop
method then you would expect the Master to always see the event logs as in
in progress.

Maybe I have missed something in tracing through the code? Is there a
reason that the eventLogger cannot be closed before the dagScheduler?

Thanks,
Richard

On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher 
wrote:

> Hi,
>
> In Spark 1.3.0 I've enabled event logging to write to an existing HDFS
> folder on a Standalone cluster. This is generally working, all the logs are
> being written. However, from the Master Web UI, the vast majority of
> completed applications are labeled as not having a history:
> http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.&title=Application%20history%20not%20found%20(app-20150601160846-1914)
>
> The log does exists though:
>
> # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914
>
> -rw-rw   3 user group1027848 2015-06-01 16:09
> /eventLogs/app-20150601160846-1914
>
> and `cat` the file ends with:
>
> {"Event":"SparkListenerApplicationEnd","Timestamp":1433174955077}
>
> This seems to indicate it saw and logged the application end.
>
> Is there a known issue here or a workaround? Looking at the source code I
> might have expected these files to end in `.inprogress` given the UI error
> message, but they don't.
>
> Thanks,
> Richard
>


Re: Event Logging to HDFS on Standalone Cluster "In Progress"

2015-06-01 Thread Richard Marscher
Ah, apologies, I found an existing issue and fix has already gone out for
this in 1.3.1 and up: https://issues.apache.org/jira/browse/SPARK-6036.

On Mon, Jun 1, 2015 at 3:39 PM, Richard Marscher 
wrote:

> It looks like it is possibly a race condition between removing the
> IN_PROGRESS and building the history UI for the application.
>
> `AppClient` sends an `UnregisterApplication(appId)` message to the
> `Master` actor, which triggers the process to look for the app's eventLogs.
> If they are suffixed with `.inprogress` then it will not build out the
> history UI and instead build the error page I've seen.
>
> Tying this together, calling SparkContext.stop() has the following block:
>
>
> if (_dagScheduler != null) { _dagScheduler.stop() _dagScheduler = null }
> if (_listenerBusStarted) { listenerBus.stop() _listenerBusStarted = false
> } _eventLogger.foreach(_.stop())
> Dag Scheduler has a TaskScheduler which has a SparkDeploySchedulerBackend
> which has an AppClient. AppClient sends itself a message to stop itself,
> and like mentioned above, it then sends a message to the Master where it
> tries to build the history UI.
>
> Meanwhile, EventLoggingListener.stop() is where the `.inprogress` suffix
> is removed in the file-system. It seems like the race condition of the Akka
> message passing to trigger the Master's building of the history UI may be
> the only reason the history UI ever gets properly setup in the first place.
> Because if the ordering of calls were all strict in the SparkContext.stop
> method then you would expect the Master to always see the event logs as in
> in progress.
>
> Maybe I have missed something in tracing through the code? Is there a
> reason that the eventLogger cannot be closed before the dagScheduler?
>
> Thanks,
> Richard
>
> On Mon, Jun 1, 2015 at 12:23 PM, Richard Marscher <
> rmarsc...@localytics.com> wrote:
>
>> Hi,
>>
>> In Spark 1.3.0 I've enabled event logging to write to an existing HDFS
>> folder on a Standalone cluster. This is generally working, all the logs are
>> being written. However, from the Master Web UI, the vast majority of
>> completed applications are labeled as not having a history:
>> http://xxx.xxx.xxx.xxx:8080/history/not-found/?msg=Application+App+is+still+in+progress.&title=Application%20history%20not%20found%20(app-20150601160846-1914)
>>
>> The log does exists though:
>>
>> # hdfs dfs -ls -R /eventLogs/app-20150601160846-1914
>>
>> -rw-rw   3 user group1027848 2015-06-01 16:09
>> /eventLogs/app-20150601160846-1914
>>
>> and `cat` the file ends with:
>>
>> {"Event":"SparkListenerApplicationEnd","Timestamp":1433174955077}
>>
>> This seems to indicate it saw and logged the application end.
>>
>> Is there a known issue here or a workaround? Looking at the source code I
>> might have expected these files to end in `.inprogress` given the UI error
>> message, but they don't.
>>
>> Thanks,
>> Richard
>>
>
>


Re: flatMap output on disk / flatMap memory overhead

2015-06-02 Thread Richard Marscher
Are you sure it's memory related? What is the disk utilization and IO
performance on the workers? The error you posted looks to be related to
shuffle trying to obtain block data from another worker node and failing to
do so in reasonable amount of time. It may still be memory related, but I'm
not sure that other resources are ruled out yet.

On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea 
wrote:

> I was tried using reduceByKey, without success.
>
> I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey .
> However, I got the same error as before, namely the error described here:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html
>
> My task is to count the frequencies of pairs of words that occur in a set
> of
> documents at least 5 times. I know that this final output is sparse and
> should comfortably fit in memory. However, the intermediate pairs that are
> spilled by flatMap might need to be stored on the disk, but I don't
> understand why the persist option does not work and my job fails.
>
> My code:
>
> rdd.persist(StorageLevel.MEMORY_AND_DISK)
>  .flatMap(x => outputPairsOfWords(x)) // outputs pairs of type
> ((word1,word2) , 1)
> .reduceByKey((a,b) => (a + b).toShort)
> .filter({case((x,y),count) => count >= 5})
>
>
> My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One
> node I keep for the master, 7 nodes for the workers.
>
> my conf:
>
> conf.set("spark.cores.max", "128")
> conf.set("spark.akka.frameSize", "1024")
> conf.set("spark.executor.memory", "115g")
> conf.set("spark.shuffle.file.buffer.kb", "1000")
>
> my spark-env.sh:
>  ulimit -n 20
>  SPARK_JAVA_OPTS="-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
> -XX:-UseCompressedOops"
>  SPARK_DRIVER_MEMORY=129G
>
> spark version: 1.1.1
>
> Thank you a lot for your help!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Application is "always" in process when I check out logs of completed application

2015-06-03 Thread Richard Marscher
I had the same issue a couple days ago. It's a bug in 1.3.0 that is fixed
in 1.3.1 and up.

https://issues.apache.org/jira/browse/SPARK-6036

The ordering that the event logs are moved from in-progress to complete is
coded to be after the Master tries to build the history page for the logs.
The only reason it even works on occasion in 1.3.0 is because the Master
part is run asynchronously and the event log status change is synchronous,
so the Master part on some occasions could be executed afterwards as a race
condition.

On Wed, Jun 3, 2015 at 2:17 AM, ayan guha  wrote:

> Have you done sc.stop() ? :)
> On 3 Jun 2015 14:05, "amghost"  wrote:
>
>> I run spark application in spark standalone cluster with client deploy
>> mode.
>> I want to check out the logs of my finished application, but I always
>> get  a
>> page telling me "Application history not found - Application xxx is still
>> in
>> process".
>> I am pretty sure that the application has indeed completed because I can
>> see
>> it in the Completed Applications list show by Spark WebUI, and I have also
>> found the log file with suffix ".inprocess"in the directory set by
>> "spark.eventLog.dir" in my spark-default.conf
>>
>> Oh, BTW, I am using spark 1.3.0
>>
>> So, is there anything I missed?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Spark Client

2015-06-03 Thread Richard Marscher
I think the short answer to the question is, no, there is no alternate API
that will not use the System.exit calls. You can craft a workaround like is
being suggested in this thread. For comparison, we are doing programmatic
submission of applications in a long-running client application. To get
around these issues we make a shadowed version of some of the Spark code in
our application to remove the System.exit calls so instead exceptions
bubble up to our application.

On Wed, Jun 3, 2015 at 7:19 AM, Akhil Das 
wrote:

> Did you try this?
>
> Create an sbt project like:
>
>  // Create your context
>  val sconf = new
> SparkConf().setAppName("Sigmoid").setMaster("spark://sigmoid:7077")
>  val sc = new SparkContext(sconf)
>
>  // Do some computations
>  sc.parallelize(1 to 1).take(10).foreach(println)
>
>  //Now return the exit status
>  System.exit(Some number)
>
>  Now, make your workflow manager to trigger *sbt run* on the project
> instead of using spark-submit.
>
>
>
> Thanks
> Best Regards
>
> On Wed, Jun 3, 2015 at 2:18 PM, pavan kumar Kolamuri <
> pavan.kolam...@gmail.com> wrote:
>
>> Hi akhil , sorry i may not conveying the question properly .  Actually we
>> are looking to Launch a spark job from a long running workflow manager,
>> which invokes spark client via SparkSubmit. Unfortunately the client upon
>> successful completion of the application exits with a System.exit(0) or
>> System.exit(NON_ZERO) when there is a failure. Question is, Is there an
>> alternate  api though which a spark application can be launched which can
>> return a exit status back to the caller as opposed to initiating JVM halt.
>>
>> On Wed, Jun 3, 2015 at 12:58 PM, Akhil Das 
>> wrote:
>>
>>> Run it as a standalone application. Create an sbt project and do sbt run?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Wed, Jun 3, 2015 at 11:36 AM, pavan kumar Kolamuri <
>>> pavan.kolam...@gmail.com> wrote:
>>>
 Hi guys , i am new to spark . I am using sparksubmit to submit spark
 jobs. But for my use case i don't want it to be exit with System.exit . Is
 there any other spark client which is api friendly other than SparkSubmit
 which shouldn't exit with system.exit. Please correct me if i am missing
 something.

 Thanks in advance




 --
 Regards
 Pavan Kumar Kolamuri


>>>
>>
>>
>> --
>> Regards
>> Pavan Kumar Kolamuri
>>
>>
>


Re: Scaling spark jobs returning large amount of data

2015-06-04 Thread Richard Marscher
It is possible to start multiple concurrent drivers, Spark dynamically
allocates ports per "spark application" on driver, master, and workers from
a port range. When you collect results back to the driver, they do not go
through the master. The master is mostly there as a coordinator between the
driver and the cluster of worker nodes, but otherwise the workers and
driver communicate directly for the underlying workload.

A "spark application" relates to one instance of a SparkContext
programmatically or to one call to one of the spark submit scripts.
Assuming you don't have dynamic resource allocation setup, each application
takes a fixed amount of the cluster resources to run. So as long as you
subdivide your cluster resources properly you can run multiple concurrent
applications against it. We are doing this in production presently.

Alternately, as Igor suggests, you can share a spark application and launch
different jobs within it. They will share the resources allocated to the
application in this case. An effect of this is you will only have a finite
amount of concurrent spark tasks (roughly translates to 1 task can execute
1 partition of a job at a time). If you launch multiple independent jobs
within the same application you will likely want to enable fair job
scheduling, otherwise stages between independent jobs will run in a FIFO
order instead of interleaving execution.

Hope this helps,
Richard

On Thu, Jun 4, 2015 at 11:20 AM, Igor Berman  wrote:

> Hi,
> as far as I understand you shouldn't send data to driver. Suppose you have
> file in hdfs/s3 or cassandra partitioning, you should create your job such
> that every executor/worker of spark will handle part of your input,
> transform, filter it and at the end write back to cassandra as output(once
> again every executor/core inside worker will write part of the output, in
> your case they will write part of report)
>
> In general I find that submitting multiple jobs in same spark context(aka
> driver) is more performant(you don't pay startup-shutdown time), for this
> some use rest server for submitting jobs to long running spark
> context(driver)
>
> I'm not sure you can run multiple concurrent drivers because of ports
>
> On 4 June 2015 at 17:30, Giuseppe Sarno  wrote:
>
>>  Hello,
>>
>> I am relatively new to spark and I am currently trying to understand how
>> to scale large numbers of jobs with spark.
>>
>> I understand that spark architecture is split in “Driver”, “Master” and
>> “Workers”. Master has a standby node in case of failure and workers can
>> scale out.
>>
>> All the examples I have seen show Spark been able to distribute the load
>> to the workers and returning small amount of data to the Driver. In my case
>> I would like to explore the scenario where I need to generate a large
>> report on data stored on Cassandra and understand how Spark architecture
>> will handle this case when multiple report jobs will be running in parallel.
>>
>> According to this  presentation
>> https://trongkhoanguyenblog.wordpress.com/2015/01/07/understand-the-spark-deployment-modes/
>> responses from workers go through the Master and finally to the Driver.
>> Does this mean that the Driver and/ or Master is a single point for all the
>> responses coming back from workers ?
>>
>> Is it possible to start multiple concurrent Drivers ?
>>
>>
>>
>> Regards,
>>
>> Giuseppe.
>>
>>
>>
>> Fair Isaac Services Limited (Co. No. 01998476) and Fair Isaac (Adeptra)
>> Limited (Co. No. 03295455) are registered in England and Wales and have a
>> registered office address of Cottons Centre, 5th Floor, Hays Lane, London,
>> SE1 2QP.
>>
>> This email and any files transmitted with it are confidential,
>> proprietary and intended solely for the individual or entity to whom they
>> are addressed. If you have received this email in error please delete it
>> immediately.
>>
>
>


Re: Deduping events using Spark

2015-06-04 Thread Richard Marscher
I think if you create a bidirectional mapping from AnalyticsEvent to
another type that would wrap it and use the nonce as its equality, you
could then do something like reduceByKey to group by nonce and map back to
AnalyticsEvent after.

On Thu, Jun 4, 2015 at 1:10 PM, lbierman  wrote:

> I'm still a bit new to Spark and am struggilng to figure out the best way
> to
> Dedupe my events.
>
> I load my Avro files from HDFS and then I want to dedupe events that have
> the same nonce.
>
> For example my code so far:
>
>  JavaRDD events = ((JavaRDD>)
> context.newAPIHadoopRDD(
> context.hadoopConfiguration(),
> AvroKeyInputFormat.class,
> AvroKey.class,
> NullWritable.class
> ).keys())
> .map(event -> AnalyticsEvent.newBuilder(event.datum()).build())
> .filter(key -> { return
> Optional.ofNullable(key.getStepEventKey()).isPresent(); })
>
> Now I want to get back an RDD of AnalyticsEvents that are unique. So I
> basically want to do:
> if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
> them.
>
> I'm not sure how to do this? If I do reduceByKey it reduces by
> AnalyticsEvent not by the values inside?
>
> Any guidance would be much appreciated how I can walk this list of events
> and only return a filtered version of unique nocnes.
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


FileOutputCommitter deadlock 1.3.1

2015-06-08 Thread Richard Marscher
Hi,

we've been seeing occasional issues in production with the FileOutCommitter
reaching a deadlock situation.

We are writing our data to S3 and currently have speculation enabled. What
we see is that Spark get's a file not found error trying to access a
temporary part file that it wrote (part-#2 file it seems to be every
time?), so the task fails. But the file actually exists in S3 so subsequent
speculations and task retries all fail because the committer tells them the
file exists. This will persist until human intervention kills the
application. Usually rerunning the application will succeed on the next try
so it is not deterministic with the dataset or anything.

It seems like there isn't a good story yet for file writing and speculation
(https://issues.apache.org/jira/browse/SPARK-4879), although our error here
seems worse that reports in that issue since I believe ours deadlocks and
those don't?

Has anyone else observed deadlocking like this?

Thanks,
Richard


Re: spark eventLog and history server

2015-06-09 Thread Richard Marscher
Hi,

I don't have a complete answer to your questions but:

"Removing the suffix does not solve the problem" -> unfortunately this is
true, the master web UI only tries to build out a Spark UI from the event
logs once, at the time the context is closed. If the event logs are
in-progress at this time, then you basically missed the opportunity.

"Does it mean I don't need to start history server if I only use spark in
standalone mode?" -> Yes, you don't need to start the history server.

On Mon, Jun 8, 2015 at 7:57 PM, Du Li  wrote:

> Event log is enabled in my spark streaming app. My code runs in standalone
> mode and the spark version is 1.3.1. I periodically stop and restart the
> streaming context by calling ssc.stop(). However, from the web UI, when
> clicking on a past job, it says the job is still in progress and does not
> show the event log. The event log files have suffix .inprogress. Removing
> the suffix does not solve the problem. Do I need to do anything here in
> order to view the event logs of finished jobs? Or do I need to stop ssc
> differently?
>
> In addition, the documentation seems to suggest history server is used for
> Mesos or YARN mode. Does it mean I don't need to start history server if I
> only use spark in standalone mode?
>
> Thanks,
> Du
>


Re: append file on hdfs

2015-06-10 Thread Richard Marscher
Hi,

if you now want to write 1 file per partition, that's actually built into
Spark as

*saveAsTextFile*(*path*)Write the elements of the dataset as a text file
(or set of text files) in a given directory in the local filesystem, HDFS
or any other Hadoop-supported file system. Spark will call toString on each
element to convert it to a line of text in the file.

On Wed, Jun 10, 2015 at 4:44 AM, Pa Rö 
wrote:

> hi,
>
> i have an idea to solve my problem, i want write one file for each spark
> partion,
> but i not know to get the actuel partion suffix/ID in my call function?
>
> points.foreachPartition(
> new VoidFunction GeoTimeDataTupel>>>() {
>
> private static final long serialVersionUID =
> -7210897529331503565L;
>
> public void call(Iterator GeoTimeDataTupel>> entry)throws Exception {
> while(entry.hasNext()) {
> Tuple2 temp =
> entry.next();
>
> try {
> FileSystem fs = FileSystem.get(new
> URI(pro.getProperty("hdfs.namenode")),new Configuration());
> Path pt=new
> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
> }
> catch(Exception e) {
> e.printStackTrace();
> }
> }
> }
> }
> );
>
> 2015-06-09 15:34 GMT+02:00 Pa Rö :
>
>> hi community,
>>
>> i want append results to one file. if i work local my function build all
>> right,
>> if i run this on a yarn cluster, i lost same rows.
>>
>> here my function to write:
>>
>> points.foreach(
>> new VoidFunction>() {
>>
>> private static final long serialVersionUID =
>> 2459995649387229261L;
>>
>> public void call(Tuple2
>> entry)throws Exception {
>> try {
>> FileSystem fs = FileSystem.get(new
>> URI(pro.getProperty("hdfs.namenode")),new Configuration());
>> Path pt=new
>> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
>>
>> if(fs.exists(pt)) {
>> FSDataInputStream in = fs.open(pt);
>> Path pt_temp = new
>> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp");
>> backup(fs.getConf(), fs, in, pt_temp);
>> in.close();
>>
>> FSDataOutputStream out = fs.create((pt),
>> true);
>> FSDataInputStream backup = fs.open(pt_temp);
>>
>> int offset = 0;
>> int bufferSize = 4096;
>>
>> int result = 0;
>>
>> byte[] buffer = new byte[bufferSize];
>> // pre read a part of content from input
>> stream
>> result = backup.read(offset, buffer, 0,
>> bufferSize);
>> // loop read input stream until it does not
>> fill whole size of buffer
>> while (result == bufferSize) {
>> out.write(buffer);
>> // read next segment from input stream by
>> moving the offset pointer
>> offset += bufferSize;
>> result = backup.read(offset, buffer, 0,
>> bufferSize);
>> }
>>
>> if (result > 0 && result < bufferSize) {
>> for (int i = 0; i < result; i++) {
>> out.write(buffer[i]);
>> }
>> }
>> out.writeBytes("Cluster: "+entry._1+", Point:
>> "+entry._2.toString()+"\n");
>> out.close();
>> }
>> else {
>> BufferedWriter bw =new BufferedWriter(new
>> OutputStreamWriter(fs.create(pt)));
>> bw.write("Cluster: "+entry._1+", Point:
>> "+entry._2.toString()+"\n");
>> bw.close();
>> }
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>> }
>>
>> public void backup(Configuration conf, FileSystem
>> fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {
>>
>> FSDataOutputStream out = fs.create(pt_temp, true);
>> IOUtils.copyBytes(sourceContent, out, 4096, false);
>> out.close();
>> }
>>
>> where is my fault?? or give it a function to write(append

Re: Executor memory allocations

2015-06-18 Thread Richard Marscher
It would be the "40%", although it's probably better to think of it as
shuffle vs. data cache and the remainder goes to tasks. As the comments for
the shuffle memory fraction configuration clarify that it will be taking
memory at the expense of the storage/data cache fraction:

spark.shuffle.memoryFraction0.2Fraction of Java heap to use for aggregation
and cogroups during shuffles, ifspark.shuffle.spill is true. At any given
time, the collective size of all in-memory maps used for shuffles is
bounded by this limit, beyond which the contents will begin to spill to
disk. If spills are often, consider increasing this value at the expense of
spark.storage.memoryFraction.

On Wed, Jun 17, 2015 at 6:02 PM, Corey Nolet  wrote:

> So I've seen in the documentation that (after the overhead memory is
> subtracted), the memory allocations of each executor are as follows (assume
> default settings):
>
> 60% for cache
> 40% for tasks to process data
>
>
> Reading about how Spark implements shuffling, I've also seen it say "20%
> of executor memory is utilized for shuffles" Does this 20% cut into the 40%
> for tasks to process data or the 60% for the data cache?
>


Re: Multiple executors writing file using java filewriter

2015-06-22 Thread Richard Marscher
Is spoutLog just a non-spark file writer? If you run that in the map call
on a cluster its going to be writing in the filesystem of the executor its
being run on. I'm not sure if that's what you intended.

On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla 
wrote:

> Running perfectly in local system but not writing to file in cluster mode 
> .ANY suggestions please ..
>
>
> //msgid is long counter
>
> JavaDStream  newinputStream=inputStream.map(new Function String>() {
> @Override
> public String call(String v1) throws Exception {
> String s1=msgId+"@"+v1;
> System.out.println(s1);
> msgId++;
> try {
> *//filewriter logic
> spoutlog.batchLogwriter(System.currentTimeMillis(), "spout-MSGID," + 
> msgeditor.getMessageId(s1));*
> } catch (Exception e) {
>
> System.out.println("exeception is here");
> e.printStackTrace();
> throw e;
> }
> System.out.println("msgid,"+msgId);
> return  msgeditor.addMessageId(v1,msgId);
> }
> });
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>
> On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla 
> wrote:
>
>> Can not we  write some data to a txt file  in parallel with multiple
>> executors  running  in parallel ??
>>
>>
>> --
>> Thanks & Regards,
>> Anshu Shukla
>>
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: Limitations using SparkContext

2015-06-23 Thread Richard Marscher
Hi,

can you detail the symptom further? Was it that only 12 requests were
services and the other 440 timed out? I don't think that Spark is well
suited for this kind of workload, or at least the way it is being
represented. How long does a single request take Spark to complete?

Even with fair scheduling, you will only be able to have a fixed amount of
tasks running on Spark at once. Usually this is bounded by the max cores
setting in configuration. Since you mention local as a comparison point I
get the impression you are running Spark Standalone for cluster. The
implication, if this is reflective of your current setup, is that you
aren't going to get much concurrency for separate spray requests. lets say
your max cores is 16 and your number of tasks/partitions per stage of your
spark DAG is 8. Then at any given time only 2 requests can be serviced. It
may also be the case that with fair scheduling that a single request gets
pre-empted after completing one stage of the DAG and has to wait to
continue instead of proceeding directly to the next stage.

This hypothesis would also support the observation that local is no better
than cluster, because you probably have even less concurrent spark tasks
available on the single local machine.


spark.cores.max(not set)When running on a standalone deploy cluster
 or a Mesos
cluster in "coarse-grained" sharing mode
,
the maximum amount of CPU cores to request for the application from across
the cluster (not from each machine). If not set, the default will be
spark.deploy.defaultCores on Spark's standalone cluster manager, or
infinite (all available cores) on Mesos.

On Tue, Jun 23, 2015 at 12:44 PM, daunnc  wrote:

> So the situation is following: got a spray server, with a spark context
> available (fair scheduling in a cluster mode, via spark-submit). There are
> some http urls, which calling spark rdd, and collecting information from
> accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of
> limitation, on requests:
>
> wrk -t8 -c50 -d30s "http://localhost:/…/";
> Running 30s test @ http://localhost:/…/
>   8 threads and 50 connections
>   Thread Stats   Avg  Stdev Max   +/- Stdev
> Latency 1.03s   523.30ms   1.70s50.00%
> Req/Sec 6.05  5.4920.00 71.58%
>   452 requests in 30.04s, 234.39KB read
>   Socket errors: connect 0, read 0, write 0, timeout 440
>
> So this happens on making some calls with spark rdd (not depends on called
> function), and in browser you can see ERR_EMPTY_RESPONSE
>
> Now the solution was to use cache, but want to know about this limitations,
> or mb some settings.
> This error happens in local mode and in cluster mode, so guess not depends
> on it.
>
> P.S. logs are clear (or simply don't know where to look, but stdout of a
> spar-submit in a client mode is clear).
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How Spark Execute chaining vs no chaining statements

2015-06-23 Thread Richard Marscher
There should be no difference assuming you don't use the intermediately
stored rdd values you are creating for anything else (rdd1, rdd2). In the
first example it still is creating these intermediate rdd objects you are
just using them implicitly and not storing the value.

It's also worth pointing out that Spark is able to pipeline operations
together into stages. That is, it should effectively translate something
like like map(f1).map(f2).map(f3) to map(f1 -> f2 -> f3) in pseudcode, if
you will. Here is a more detailed explanation from one of the committer's
on SO:
http://stackoverflow.com/questions/19340808/spark-single-pipelined-scala-command-better-than-separate-commands

On Tue, Jun 23, 2015 at 5:17 PM, Ashish Soni  wrote:

> Hi All ,
>
> What is difference between below in terms of execution to the cluster with
> 1 or more worker node
>
> rdd.map(...).map(...)...map(..)
>
> vs
>
> val rdd1 = rdd.map(...)
> val rdd2 = rdd1.map(...)
> val rdd3 = rdd2.map(...)
>
> Thanks,
> Ashish
>


Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
We've seen this issue as well in production. We also aren't sure what
causes it, but have just recently shaded some of the Spark code in
TaskSchedulerImpl that we use to effectively bubble up an exception from
Spark instead of zombie in this situation. If you are interested I can go
into more detail about that. Otherwise I'm also keen to find out more on
how this might be happening.

On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder 
wrote:

> Hi,
>
> I have a really annoying issue that i cannot replicate consistently, still
> it happens every +- 100 submissions. (it's a job that's running every 3
> minutes).
> Already reported an issue for this:
> https://issues.apache.org/jira/browse/SPARK-8592
>
> Here are the Thread dump of the Driver and the Executor:
> https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ
>
> Any direction is should look into?
>
> Spark 1.4.0
> Java 1.8.0_45 (Oracle Corporation)
> Scala 2.11.6
>
> I already tried to resolve the NPE by not logging the ActorRef. This makes
> the NPE go away :)
>
> But  the root cause lies deeper I expect, since then the driver then still
> hangs with the "*WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient resources*" messages. But there are enough resources
> available in the cluster, it has plenty of CPU and Memory left.
>
> Logs from Driver:
>
> 15/06/26 11:58:19 INFO Remoting: Starting remoting
> 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@172.17.0.123:51415]
> 15/06/26 11:58:19 INFO Utils: Successfully started service 'sparkDriver'
> on port 51415.
> 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
> 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
> 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
> 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity
> 265.1 MB
> 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
> 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
> 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
> server' on port 33176.
> 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
> 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at
> http://172.17.0.123:4040
> 15/06/26 11:58:20 INFO SparkContext: Added JAR
> file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
> http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with timestamp
> 1435319900257
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
> akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
> cluster with app ID app-20150626115820-0917
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
> app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
> 10.0.7.171:47050) with 1 cores
> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID
> app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, 2.0
> GB RAM
> 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative execution
> thread
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
> app-20150626115820-0917/0 is now LOADING
> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
> app-20150626115820-0917/0 is now RUNNING
> 15/06/26 11:58:20 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
> 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on 52000
> 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register BlockManager
> 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
> manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
> 172.17.0.123, 52000)
> 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is
> ready for scheduling beginning after reached minRegisteredResourcesRatio:
> 0.0
> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
> 15/06/26 11:58:24 INFO SparkContext: Starting job: map at
> SparkProductEventAggregator.scala:144
> 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1
> [5cc3f53084]
> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
> {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
> 15/06/26 11:58:24 INFO DAGSchedul

Re: Spark driver hangs on start of job

2015-06-26 Thread Richard Marscher
Hi,

we are on 1.3.1 right now so in case there are differences in the Spark
files I'll walk through the logic of what we did and post a couple gists at
the end. We haven't committed to forking Spark for our own deployments yet,
so right now we shadow some Spark classes in our application code with our
versions of the classes. Keep in mind I am not a Spark committer so the
following is a best effort basis that is working for us. But it may be that
someone more knowledgable about the Spark codebase might see a pitfall to
my solution or a better solution.

--

First, we'll start with the root issue in TaskSchedulerImpl. You will find
the code that prints the "Initial job has not accepted any resources"
warning inside the "submitTasks" function. Spark creates a separate thread
that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
the submitted task set has been launched. It only posts the warn logging
here and does nothing. I will come back to this part of the code in a
moment.

The code that determines when the "hasLaunchedTask" flag gets set (and thus
closes out the starvation thread and the task set is being worked on by the
cluster) is within the "resourceOffers" function. The various Spark
Scheduler Backend classes will periodically call this function in
TaskSchedulerImpl until cluster resources have been assigned to the task
set.

To start signaling the zombied scenario, I created a new flag: "@volatile
private var hasZombied = false". In our experience we always get the
resources in resourceOffer before the starvation thread runs, otherwise we
have always hit the zombie scenario if resources weren't allocated yet. So
I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) {
dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
a moment.

The last detail here is to add code inside the starvation thread block
after it posts the warning log. Set "hasZombied" to true and then call
"this.cancel()" to stop the starvation thread from continuing to run. With
this we now have all the steps needed inside TaskSchedulerImpl to start
signaling out the zombied condition.

Back to the custom function. DAGScheduler has reference to the appropriate
Spark listeners that can propagate errors to the task set and more
importantly back to your application code. If you look at DAGScheduler
class, you will find a function called "cleanUpAfterSchedulerStop()". This
function does everything we want, except it is hard coded to a specific
exception "val error = new SparkException(...)". What I did was copy this
and made another function that returned a custom Exception I created that I
use to signal the zombie, something like
SparkTaskResourceAllocationZombieError. Now you call this function within
the conditional block in TaskSchedulerImpl.resourceOffers and you should
see your exception propagating out to your application code so you can take
appropriate actions.

In our case, we are submitting Spark applications programmatically from a
Scala application service on an EC2 instance to a Spark Standalone cluster
in EC2. Whenever we see this error, the application service EC2 instance is
unable to get resources from the cluster even when attempting subsequent
Spark applications for a long period of time (it eventually recovers hours
or days later but that is not useful for us). So in our case we need to
reschedule the failed Spark application on another EC2 application instance
and shut down this current EC2 instance because it can no longer get
cluster resources. Your use case may be different, but at least action can
be taken at an application level.

Here is some source code, you should be able to locate most of my additions
to the code by searching for comments starting with "// Localytics Code"
TaskSchedulerImpl gist: https://gist.github.com/rmarsch/e5d298e582ab75957957
DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7

Regards,
Richard

On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder 
wrote:

> Hi Richard,
>
> I would  like to see how we can get a workaround to get out of the Zombie
> situation since were planning for production :)
>
> If you could share the workaround or point directions that would be great!
>
> Sjoerd
>
> 2015-06-26 16:53 GMT+02:00 Richard Marscher :
>
>> We've seen this issue as well in production. We also aren't sure what
>> causes it, but have just recently shaded some of the Spark code in
>> TaskSchedulerImpl that we use to effectively bubble up an exception from
>> Spark instead of zombie in this situation. If you are interested I can go
>> into more detail about that. Otherwise I'm also keen to find out mo

Re: Applying functions over certain count of tuples .

2015-06-29 Thread Richard Marscher
Hi,

not sure what the context is but I think you can do something similar with
mapPartitions:

rdd.mapPartitions { iterator =>

  iterator.grouped(5).map { tupleGroup => emitOneRddForGroup(tupleGroup) }

}

The edge case is when the final grouping doesn't have exactly 5 items, if
that matters.

On Mon, Jun 29, 2015 at 3:57 PM, anshu shukla 
wrote:

> I want  to apply some logic   on the basis  of  a FIX count of number of
> tuples  in each RDD . *suppose emit one rdd for every 5  tuple of
>  previous  RDD . *
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: Spark driver hangs on start of job

2015-07-02 Thread Richard Marscher
Ah I see, glad that simple patch works for your problem. That seems to be a
different underlying problem than we have been experiencing. In our case,
the executors are failing properly, its just that none of the new ones will
ever escape experiencing the same exact issue. So we start a death spiral
of thousands of failed executors, all of which can't connect with the
driver. Meanwhile, the driver just sits there in the "zombie" state doing
nothing while it waits for executors to respond. In that light, my solution
is geared towards solving things on the driver-side gracefully.


On Thu, Jul 2, 2015 at 4:37 AM, Sjoerd Mulder 
wrote:

> Hi Richard,
>
> I have actually applied the following fix to our 1.4.0 version and this
> seem to resolve the zombies :)
>
> https://github.com/apache/spark/pull/7077/files
>
> Sjoerd
>
> 2015-06-26 20:08 GMT+02:00 Richard Marscher :
>
>> Hi,
>>
>> we are on 1.3.1 right now so in case there are differences in the Spark
>> files I'll walk through the logic of what we did and post a couple gists at
>> the end. We haven't committed to forking Spark for our own deployments yet,
>> so right now we shadow some Spark classes in our application code with our
>> versions of the classes. Keep in mind I am not a Spark committer so the
>> following is a best effort basis that is working for us. But it may be that
>> someone more knowledgable about the Spark codebase might see a pitfall to
>> my solution or a better solution.
>>
>> --
>>
>> First, we'll start with the root issue in TaskSchedulerImpl. You will
>> find the code that prints the "Initial job has not accepted any resources"
>> warning inside the "submitTasks" function. Spark creates a separate thread
>> that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
>> the submitted task set has been launched. It only posts the warn logging
>> here and does nothing. I will come back to this part of the code in a
>> moment.
>>
>> The code that determines when the "hasLaunchedTask" flag gets set (and
>> thus closes out the starvation thread and the task set is being worked on
>> by the cluster) is within the "resourceOffers" function. The various Spark
>> Scheduler Backend classes will periodically call this function in
>> TaskSchedulerImpl until cluster resources have been assigned to the task
>> set.
>>
>> To start signaling the zombied scenario, I created a new flag: "@volatile
>> private var hasZombied = false". In our experience we always get the
>> resources in resourceOffer before the starvation thread runs, otherwise we
>> have always hit the zombie scenario if resources weren't allocated yet. So
>> I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
>> true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) {
>> dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
>> a moment.
>>
>> The last detail here is to add code inside the starvation thread block
>> after it posts the warning log. Set "hasZombied" to true and then call
>> "this.cancel()" to stop the starvation thread from continuing to run. With
>> this we now have all the steps needed inside TaskSchedulerImpl to start
>> signaling out the zombied condition.
>>
>> Back to the custom function. DAGScheduler has reference to the
>> appropriate Spark listeners that can propagate errors to the task set and
>> more importantly back to your application code. If you look at DAGScheduler
>> class, you will find a function called "cleanUpAfterSchedulerStop()". This
>> function does everything we want, except it is hard coded to a specific
>> exception "val error = new SparkException(...)". What I did was copy this
>> and made another function that returned a custom Exception I created that I
>> use to signal the zombie, something like
>> SparkTaskResourceAllocationZombieError. Now you call this function within
>> the conditional block in TaskSchedulerImpl.resourceOffers and you should
>> see your exception propagating out to your application code so you can take
>> appropriate actions.
>>
>> In our case, we are submitting Spark applications programmatically from a
>> Scala application service on an EC2 instance to a Spark Standalone cluster
>> in EC2. Whenever we see this error, the application service EC2 instance is
>> unable to get resources from the cluster even when attempting subsequent
>> Spark applications for a long period of ti

Re: How to create empty RDD

2015-07-06 Thread Richard Marscher
This should work

val output: RDD[(DetailInputRecord, VISummary)] =
sc.paralellize(Seq.empty[(DetailInputRecord, VISummary)])

On Mon, Jul 6, 2015 at 5:11 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I need to return an empty RDD of type
>
> val output: RDD[(DetailInputRecord, VISummary)]
>
>
>
> This does not work
>
> val output: RDD[(DetailInputRecord, VISummary)] = new RDD()
>
> as RDD is abstract class.
>
> How do i create empty RDD ?
> --
> Deepak
>
>


Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-07 Thread Richard Marscher
Would it be possible to have a wrapper class that just represents a
reference to a singleton holding the 3rd party object? It could proxy over
calls to the singleton object which will instantiate a private instance of
the 3rd party object lazily? I think something like this might work if the
workers have the singleton object in their classpath.

here's a rough sketch of what I was thinking:

object ThirdPartySingleton {
  private lazy val thirdPartyObj = ...

  def someProxyFunction() = thirdPartyObj.()
}

class ThirdPartyReference extends Serializable {
  def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
}

also found this SO post:
http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers


On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg 
wrote:

> Hi,
>
> I am seeing a lot of posts on singletons vs. broadcast variables, such as
> *
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
> *
>
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>
> What's the best approach to instantiate an object once and have it be
> reused
> by the worker(s).
>
> E.g. I have an object that loads some static state such as e.g. a
> dictionary/map, is a part of 3rd party API and is not serializable.  I
> can't
> seem to get it to be a singleton on the worker side as the JVM appears to
> be
> wiped on every request so I get a new instance.  So the singleton doesn't
> stick.
>
> Is there an approach where I could have this object or a wrapper of it be a
> broadcast var? Can Kryo get me there? would that basically mean writing a
> custom serializer?  However, the 3rd party object may have a bunch of
> member
> vars hanging off it, so serializing it properly may be non-trivial...
>
> Any pointers/hints greatly appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-08 Thread Richard Marscher
Ah, I see this is streaming. I haven't any practical experience with that
side of Spark. But the foreachPartition idea is a good approach. I've used
that pattern extensively, even though not for singletons, but just to
create non-serializable objects like API and DB clients on the executor
side. I think it's the most straightforward approach to dealing with any
non-serializable object you need.

I don't entirely follow what over-network data shuffling effects you are
alluding to (maybe more specific to streaming?).

On Wed, Jul 8, 2015 at 9:41 AM, Dmitry Goldenberg 
wrote:

> My singletons do in fact stick around. They're one per worker, looks
> like.  So with 4 workers running on the box, we're creating one singleton
> per worker process/jvm, which seems OK.
>
> Still curious about foreachPartition vs. foreachRDD though...
>
> On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher <
> rmarsc...@localytics.com> wrote:
>
>> Would it be possible to have a wrapper class that just represents a
>> reference to a singleton holding the 3rd party object? It could proxy over
>> calls to the singleton object which will instantiate a private instance of
>> the 3rd party object lazily? I think something like this might work if the
>> workers have the singleton object in their classpath.
>>
>> here's a rough sketch of what I was thinking:
>>
>> object ThirdPartySingleton {
>>   private lazy val thirdPartyObj = ...
>>
>>   def someProxyFunction() = thirdPartyObj.()
>> }
>>
>> class ThirdPartyReference extends Serializable {
>>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
>> }
>>
>> also found this SO post:
>> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>>
>>
>> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg 
>> wrote:
>>
>>> Hi,
>>>
>>> I am seeing a lot of posts on singletons vs. broadcast variables, such as
>>> *
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>>> *
>>>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>>
>>> What's the best approach to instantiate an object once and have it be
>>> reused
>>> by the worker(s).
>>>
>>> E.g. I have an object that loads some static state such as e.g. a
>>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>>> can't
>>> seem to get it to be a singleton on the worker side as the JVM appears
>>> to be
>>> wiped on every request so I get a new instance.  So the singleton doesn't
>>> stick.
>>>
>>> Is there an approach where I could have this object or a wrapper of it
>>> be a
>>> broadcast var? Can Kryo get me there? would that basically mean writing a
>>> custom serializer?  However, the 3rd party object may have a bunch of
>>> member
>>> vars hanging off it, so serializing it properly may be non-trivial...
>>>
>>> Any pointers/hints greatly appreciated.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Create RDD from output of unix command

2015-07-08 Thread Richard Marscher
As a distributed data processing engine, Spark should be fine with millions
of lines. It's built with the idea of massive data sets in mind. Do you
have more details on how you anticipate the output of a unix command
interacting with a running Spark application? Do you expect Spark to be
continuously running and somehow observe unix command outputs? Or are you
thinking more along the lines of running a unix command with output and
then taking whatever format that is and running a spark job against it? If
it's the latter, it should be as simple as writing the command output to a
file and then loading the file into an RDD in Spark.

On Wed, Jul 8, 2015 at 2:02 PM, foobar  wrote:

> What's the best practice of creating RDD from some external unix command
> output? I assume if the output size is large (say millions of lines),
> creating RDD from an array of all lines is not a good idea? Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Spark serialization in closure

2015-07-09 Thread Richard Marscher
Reading that article and applying it to your observations of what happens
at runtime:

shouldn't the closure require serializing testing? The foo singleton object
is a member of testing, and then you call this foo value in the closure
func and further in the foreachPartition closure. So following by that
article, Scala will attempt to serialize the containing object/class
testing to get the foo instance.

On Thu, Jul 9, 2015 at 4:11 PM, Chen Song  wrote:

> Repost the code example,
>
> object testing extends Serializable {
> object foo {
>   val v = 42
> }
> val list = List(1,2,3)
> val rdd = sc.parallelize(list)
> def func = {
>   val after = rdd.foreachPartition {
> it => println(foo.v)
>   }
> }
>   }
>
> On Thu, Jul 9, 2015 at 4:09 PM, Chen Song  wrote:
>
>> Thanks Erik. I saw the document too. That is why I am confused because as
>> per the article, it should be good as long as *foo *is serializable.
>> However, what I have seen is that it would work if *testing* is
>> serializable, even foo is not serializable, as shown below. I don't know if
>> there is something specific to Spark.
>>
>> For example, the code example below works.
>>
>> object testing extends Serializable {
>>
>> object foo {
>>
>>   val v = 42
>>
>> }
>>
>> val list = List(1,2,3)
>>
>> val rdd = sc.parallelize(list)
>>
>> def func = {
>>
>>   val after = rdd.foreachPartition {
>>
>> it => println(foo.v)
>>
>>   }
>>
>> }
>>
>>   }
>>
>> On Thu, Jul 9, 2015 at 12:11 PM, Erik Erlandson  wrote:
>>
>>> I think you have stumbled across this idiosyncrasy:
>>>
>>>
>>> http://erikerlandson.github.io/blog/2015/03/31/hygienic-closures-for-scala-function-serialization/
>>>
>>>
>>>
>>>
>>> - Original Message -
>>> > I am not sure this is more of a question for Spark or just Scala but I
>>> am
>>> > posting my question here.
>>> >
>>> > The code snippet below shows an example of passing a reference to a
>>> closure
>>> > in rdd.foreachPartition method.
>>> >
>>> > ```
>>> > object testing {
>>> > object foo extends Serializable {
>>> >   val v = 42
>>> > }
>>> > val list = List(1,2,3)
>>> > val rdd = sc.parallelize(list)
>>> > def func = {
>>> >   val after = rdd.foreachPartition {
>>> > it => println(foo.v)
>>> >   }
>>> > }
>>> >   }
>>> > ```
>>> > When running this code, I got an exception
>>> >
>>> > ```
>>> > Caused by: java.io.NotSerializableException:
>>> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$
>>> > Serialization stack:
>>> > - object not serializable (class:
>>> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$, value:
>>> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$@10b7e824)
>>> > - field (class:
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
>>> > name: $outer, type: class
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$)
>>> > - object (class
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$testing$$anonfun$1,
>>> > )
>>> > ```
>>> >
>>> > It looks like Spark needs to serialize `testing` object. Why is it
>>> > serializing testing even though I only pass foo (another serializable
>>> > object) in the closure?
>>> >
>>> > A more general question is, how can I prevent Spark from serializing
>>> the
>>> > parent class where RDD is defined, with still support of passing in
>>> > function defined in other classes?
>>> >
>>> > --
>>> > Chen Song
>>> >
>>>
>>
>>
>>
>> --
>> Chen Song
>>
>>
>
>
> --
> Chen Song
>
>


-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Apache Spark : Custom function for reduceByKey - missing arguments for method

2015-07-10 Thread Richard Marscher
Did you try it by adding the `_` after the method names to partially apply
them? Scala is saying that its trying to immediately apply those methods
but can't find arguments.  But you instead are trying to pass them along as
functions (which they aren't). Here is a link to a stackoverflow answer
that should help clarify: http://stackoverflow.com/a/19720808/72401. I
think there are two solutions, turn the getMax and getMin into functions by
using val ala:

val getMax: (DoubleDimension, DoubleDimension) => DoubleDimension = { (a,b)
=>
  if (a > b) a
  else b
}

val getMin: (DoubleDimension, DoubleDimension) => DoubleDimension = { (a,b)
=>
  if (a < b) a
  else b
}

or just partially apply them:

maxVector = attribMap.reduceByKey( getMax _)
minVector = attribMap.reduceByKey( getMin _)

On Thu, Jul 9, 2015 at 9:09 PM, ameyamm  wrote:

> I am trying to normalize a dataset (convert values for all attributes in
> the
> vector to "0-1" range). I created an RDD of tuple (attrib-name,
> attrib-value) for all the records in the dataset as follows:
>
> val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap(
>   contact => {
> List(
>   ("dage",contact.dage match { case Some(value)
> => DoubleDimension(value) ; case None => null }),
>   ("dancstry1",contact.dancstry1 match { case
> Some(value) => DoubleDimension(value) ; case None => null }),
>   ("dancstry2",contact.dancstry2 match { case
> Some(value) => DoubleDimension(value) ; case None => null }),
>   ("ddepart",contact.ddepart match { case
> Some(value) => DoubleDimension(value) ; case None => null }),
>   ("dhispanic",contact.dhispanic match { case
> Some(value) => DoubleDimension(value) ; case None => null }),
>   ("dhour89",contact.dhour89 match { case
> Some(value) => DoubleDimension(value) ; case None => null })
> )
>   }
> )
>
> Here, contactDataset is of the type RDD[Contact]. The fields of Contact
> class are of type Option[Long].
>
> DoubleDimension is a simple wrapper over Double datatype. It extends the
> Ordered trait and implements corresponding compare method and equals
> method.
>
> To obtain the max and min attribute vector for computing the normalized
> values,
>
> maxVector = attribMap.reduceByKey( getMax )
> minVector = attribMap.reduceByKey( getMin )
>
> Implementation of getMax and getMin is as follows:
>
> def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension =
> {
> if (a > b) a
> else b
> }
>
> def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = {
> if (a < b) a
> else b
> }
>
> I get a compile error at calls to the methods getMax and getMin stating:
>
> [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error:
> missing arguments for method getMax in class DatasetReader;
>
> [ERROR] follow this method with '_' if you want to treat it as a partially
> applied function
>
> [ERROR] maxVector = attribMap.reduceByKey( getMax )
>
> [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error:
> missing arguments for method getMin in class DatasetReader;
>
> [ERROR] follow this method with '_' if you want to treat it as a partially
> applied function
>
> [ERROR] minVector = attribMap.reduceByKey( getMin )
>
> I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as
> per my knowledge, I can pass any method to it as long as the functions is
> of
> the type f : (V, V) => V.
>
> I am really stuck here. Please help.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Unit tests of spark application

2015-07-10 Thread Richard Marscher
Unless you had something specific in mind, it should be as simple as
creating a SparkContext object using a master of local[2] in your tests

On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire 
wrote:

> Hi,
>
> I want to write junit test cases in scala for testing spark application.
> Is there any guide or link which I can refer.
>
> Thank you very much.
>
> -Naveen
>
>
>
>


-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: sbt-assembly spark-streaming-kinesis-asl error

2015-04-14 Thread Richard Marscher
Hi,

I've gotten an application working with sbt-assembly and spark, thought I'd
present an option. In my experience, trying to bundle any of the Spark
libraries in your uber jar is going to be a major pain. There will be a lot
of deduplication to work through and even if you resolve them it can be
easy to do it incorrectly. I considered it an intractable problem. So the
alternative is to not include those jars in your uber jar. For this to work
you will need the same libraries on the classpath of your Spark cluster and
your driver program (if you are running that as an application and not just
using spark-submit).

As for your NoClassDefFoundError, you either are missing Joda Time in your
runtime classpath or have conflicting versions. It looks like something
related to AWS wants to use it. Check your uber jar to see if its including
the org/joda/time as well as the classpath of your spark cluster. For
example: I use the Spark 1.3.0 on Hadoop 1.x, which in the 'lib' directory
has an uber jar spark-assembly-1.3.0-hadoop1.0.4.jar. At one point in Spark
1.2 I found a conflict between httpclient versions that my uber jar pulled
in for AWS libraries and the one bundled in the spark uber jar. I hand
patched the spark uber jar to remove the offending httpclient bytecode to
resolve the issue. You may be facing a similar situation.

I hope that gives some ideas for resolving your issue.

Regards,
Rich

On Tue, Apr 14, 2015 at 1:14 PM, Mike Trienis 
wrote:

> Hi Vadim,
>
> After removing "provided" from "org.apache.spark" %%
> "spark-streaming-kinesis-asl" I ended up with huge number of deduplicate
> errors:
>
> https://gist.github.com/trienism/3d6f8d6b7ff5b7cead6a
>
> It would be nice if you could share some pieces of your mergeStrategy code
> for reference.
>
> Also, after adding "provided" back to "spark-streaming-kinesis-asl" and I
> submit the spark job with the spark-streaming-kinesis-asl jar file
>
> sh /usr/lib/spark/bin/spark-submit --verbose --jars
> lib/spark-streaming-kinesis-asl_2.10-1.2.0.jar --class com.xxx.DataConsumer
> target/scala-2.10/xxx-assembly-0.1-SNAPSHOT.jar
>
> I still end up with the following error...
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeFormat
> at com.amazonaws.auth.AWS4Signer.(AWS4Signer.java:44)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
>
> Has anyone else run into this issue?
>
>
>
> On Mon, Apr 13, 2015 at 6:46 PM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> I don't believe the Kinesis asl should be provided. I used mergeStrategy
>> successfully to produce an "uber jar."
>>
>> Fyi, I've been having trouble consuming data out of Kinesis with Spark
>> with no success :(
>> Would be curious to know if you got it working.
>>
>> Vadim
>>
>> On Apr 13, 2015, at 9:36 PM, Mike Trienis 
>> wrote:
>>
>> Hi All,
>>
>> I have having trouble building a fat jar file through sbt-assembly.
>>
>> [warn] Merging 'META-INF/NOTICE.txt' with strategy 'rename'
>> [warn] Merging 'META-INF/NOTICE' with strategy 'rename'
>> [warn] Merging 'META-INF/LICENSE.txt' with strategy 'rename'
>> [warn] Merging 'META-INF/LICENSE' with strategy 'rename'
>> [warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
>> [warn] Merging
>> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.properties' with
>> strategy 'discard'
>> [warn] Merging
>> 'META-INF/maven/com.thoughtworks.paranamer/paranamer/pom.xml' with strategy
>> 'discard'
>> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.properties'
>> with strategy 'discard'
>> [warn] Merging 'META-INF/maven/commons-dbcp/commons-dbcp/pom.xml' with
>> strategy 'discard'
>> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.properties'
>> with strategy 'discard'
>> [warn] Merging 'META-INF/maven/commons-pool/commons-pool/pom.xml' with
>> strategy 'discard'
>> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.properties' with
>> strategy 'discard'
>> [warn] Merging 'META-INF/maven/joda-time/joda-time/pom.xml' with strategy
>> 'discard'
>> [warn] Merging 'META-INF/maven/log4j/log4j/pom.properties' with strategy
>> 'discard'
>> [warn] Merging 'META-INF/maven/log4j/log4j/pom.xml' with strategy
>> 'discard'
>> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.properties' with
>> strategy 'discard'
>> [warn] Merging 'META-INF/maven/org.joda/joda-convert/pom.xml' with
>> strategy 'discard'
>> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.properties' with
>> strategy 'discard'
>> [warn] Merging 'META-INF/maven/org.slf4j/slf4j-api/pom.xml' with strategy
>> 'discard'
>> [warn] Merging 'META-INF/maven/org.sl

Re: Problem with Spark SQL UserDefinedType and sbt assembly

2015-04-16 Thread Richard Marscher
If it fails with sbt-assembly but not without it, then there's always the
likelihood of a classpath issue. What dependencies are you rolling up into
your assembly jar?

On Thu, Apr 16, 2015 at 4:46 PM, Jaonary Rabarisoa 
wrote:

> Any ideas ?
>
> On Thu, Apr 16, 2015 at 5:04 PM, Jaonary Rabarisoa 
> wrote:
>
>> Dear all,
>>
>> Here is an issue that gets me mad. I wrote a UserDefineType in order to
>> be able to store a custom type in a parquet file. In my code I just create
>> a DataFrame with my custom data type and write in into a parquet file. When
>> I run my code directly inside idea every thing works like a charm. But when
>> I create the assembly jar with sbt assembly and run the same code with
>> spark-submit I get the following error :
>>
>> *15/04/16 17:02:17 ERROR Executor: Exception in task 0.0 in stage 0.0
>> (TID 0)*
>> *java.lang.IllegalArgumentException: Unsupported dataType:
>> {"type":"struct","fields":[{"name":"metadata","type":{"type":"udt","class":"org.apache.spark.vision.types.ImageMetadataUDT","pyClass":null,"sqlType":{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"encoding","type":"string","nullable":true,"metadata":{}},{"name":"cameraId","type":"string","nullable":true,"metadata":{}},{"name":"timestamp","type":"string","nullable":true,"metadata":{}},{"name":"frameId","type":"string","nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}}]},
>> [1.1] failure: `TimestampType' expected but `{' found*
>>
>>
>> *{"type":"struct","fields":[{"name":"metadata","type":{"type":"udt","class":"org.apache.spark.vision.types.ImageMetadataUDT","pyClass":null,"sqlType":{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"encoding","type":"string","nullable":true,"metadata":{}},{"name":"cameraId","type":"string","nullable":true,"metadata":{}},{"name":"timestamp","type":"string","nullable":true,"metadata":{}},{"name":"frameId","type":"string","nullable":true,"metadata":{}}]}},"nullable":true,"metadata":{}}]}*
>> *^*
>> *at
>> org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)*
>> *at
>> org.apache.spark.sql.types.DataType$.fromCaseClassString(dataTypes.scala:98)*
>> *at
>> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)*
>> *at
>> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$6.apply(ParquetTypes.scala:402)*
>> *at scala.util.Try.getOrElse(Try.scala:77)*
>> *at
>> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromString(ParquetTypes.scala:402)*
>> *at
>> org.apache.spark.sql.parquet.RowWriteSupport.init(ParquetTableSupport.scala:145)*
>> *at
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:278)*
>> *at
>> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)*
>> *at org.apache.spark.sql.parquet.ParquetRelation2.org
>> $apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:691)*
>> *at
>> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)*
>> *at
>> org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:713)*
>> *at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)*
>> *at org.apache.spark.scheduler.Task.run(Task.scala:64)*
>> *at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)*
>> *at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)*
>> *at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)*
>> *at java.lang.Thread.run(Thread.java:745)*
>>
>>
>


Re: Instantiating/starting Spark jobs programmatically

2015-04-21 Thread Richard Marscher
Could you possibly describe what you are trying to learn how to do in more
detail? Some basics of submitting programmatically:

- Create a SparkContext instance and use that to build your RDDs
- You can only have 1 SparkContext per JVM you are running, so if you need
to satisfy concurrent job requests you would need to manage a SparkContext
as a shared resource on that server. Keep in mind if something goes wrong
with that SparkContext, all running jobs would probably be in a failed
state and you'd need to try to get a new SparkContext.
- There are System.exit calls built into Spark as of now that could kill
your running JVM. We have shadowed some of the most offensive bits within
our own application to work around this. You'd likely want to do that or to
do your own Spark fork. For example, if the SparkContext can't connect to
your cluster master node when it is created, it will System.exit.
- You'll need to provide all of the relevant classes that your platform
uses in the jobs on the classpath of the spark cluster. We do this with a
JAR file loaded from S3 dynamically by a SparkContext, but there are other
options.

On Mon, Apr 20, 2015 at 10:12 PM, firemonk9 
wrote:

> I have built a data analytics SaaS platform by creating Rest end points and
> based on the type of job request I would invoke the necessary spark
> job/jobs
> and return the results as json(async). I used yarn-client mode to submit
> the
> jobs to yarn cluster.
>
> hope this helps.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Instantiating-starting-Spark-jobs-programmatically-tp22577p22584.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Group by order by

2015-04-27 Thread Richard Marscher
Hi,

that error seems to indicate the basic query is not properly expressed. If
you group by just ID, then that means it would need to aggregate all the
time values into one value per ID, so you can't sort by it. Thus it tries
to suggest an aggregate function for time so you can have 1 value per ID
and properly sort it.

On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander 
wrote:

>  Hi,
>
>
>
> Could you suggest what is the best way to do “group by x order by y” in
> Spark?
>
>
>
> When I try to perform it with Spark SQL I get the following error (Spark
> 1.3):
>
>
>
> val results = sqlContext.sql("select * from sample group by id order by
> time")
>
> org.apache.spark.sql.AnalysisException: expression 'time' is neither
> present in the group by, nor is it an aggregate function. Add to group by
> or wrap in first() if you don't care which value you get.;
>
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)
>
>
>
> Is there a way to do it with just RDD?
>
>
>
> Best regards, Alexander
>


Re: Group by order by

2015-04-27 Thread Richard Marscher
It's not related to Spark, but the concept of what you are trying to do
with the data. Grouping by ID means consolidating data for each ID down to
1 row per ID. You can sort by time after this point yes, but you would need
to either take each ID and time value pair OR do some aggregate operation
on the time. That's what the error message is explaining. Maybe you can
describe what you want your results to look like?

Here is some detail about the underlying operations here:

Example Data:
ID |  Time |  SomeVal

102-02-154
1   02-03-15 5
2   02-02-15 4
2   02-02-15 5
2   02-05-15 2

A.

So if you do Group By ID this means 1 row per ID like below:

ID

1
2

To include Time in this projection you need to aggregate it with a function
to a single value. Then and only then can you use it in the projection and
sort on it.

"SELECT id, max(time) FROM sample GROUP BY id SORT BY max(time) desc;"

ID  | max(time)
2 02-05-15
1 02-03-15

B.

Or if you do Group by ID, time then you get 1 row per ID and time pair:

ID | Time
102-02-15
102-03-15
202-02-15
202-05-15

Notice both rows with ID `2` and time `02-02-15` group down to 1 row in the
results here. In this case you can sort the results by time without using
an aggregate function.

"SELECT id, time FROM sample GROUP BY id, time SORT BY time desc;"

ID | Time
202-05-15
102-03-15
102-02-15
202-02-15


On Mon, Apr 27, 2015 at 3:28 PM, Ulanov, Alexander 
wrote:

>  Hi Richard,
>
>
>
> There are several values of time per id. Is there a way to perform group
> by id and sort by time in Spark?
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Richard Marscher [mailto:rmarsc...@localytics.com]
> *Sent:* Monday, April 27, 2015 12:20 PM
> *To:* Ulanov, Alexander
> *Cc:* user@spark.apache.org
> *Subject:* Re: Group by order by
>
>
>
> Hi,
>
>
>
> that error seems to indicate the basic query is not properly expressed. If
> you group by just ID, then that means it would need to aggregate all the
> time values into one value per ID, so you can't sort by it. Thus it tries
> to suggest an aggregate function for time so you can have 1 value per ID
> and properly sort it.
>
>
>
> On Mon, Apr 27, 2015 at 3:07 PM, Ulanov, Alexander <
> alexander.ula...@hp.com> wrote:
>
>  Hi,
>
>
>
> Could you suggest what is the best way to do “group by x order by y” in
> Spark?
>
>
>
> When I try to perform it with Spark SQL I get the following error (Spark
> 1.3):
>
>
>
> val results = sqlContext.sql("select * from sample group by id order by
> time")
>
> org.apache.spark.sql.AnalysisException: expression 'time' is neither
> present in the group by, nor is it an aggregate function. Add to group by
> or wrap in first() if you don't care which value you get.;
>
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis.failAnalysis(CheckAnalysis.scala:37)
>
>
>
> Is there a way to do it with just RDD?
>
>
>
> Best regards, Alexander
>
>
>


Re: Scalability of group by

2015-04-28 Thread Richard Marscher
Hi,

I can offer a few ideas to investigate in regards to your issue here. I've
run into resource issues doing shuffle operations with a much smaller
dataset than 2B. The data is going to be saved to disk by the BlockManager
as part of the shuffle and then redistributed across the cluster as
relevant to the group by. So the data is going to be replicated during the
operation.

I might suggest trying to allocate more memory for your executors in your
cluster. You might also want to look into configuring more explicitly the
shuffle functionality (
https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior).
Check the disk usage on the worker nodes, in our case we actually had small
disk space to start with and were running out of temporary space for the
shuffle operation.

I believe you should also be able to find more clear errors in logs from
the worker nodes if you haven't checked yet.

On Mon, Apr 27, 2015 at 10:02 PM, Ulanov, Alexander  wrote:

>  It works on a smaller dataset of 100 rows. Probably I could find the
> size when it fails using binary search. However, it would not help me
> because I need to work with 2B rows.
>
>
>
> *From:* ayan guha [mailto:guha.a...@gmail.com]
> *Sent:* Monday, April 27, 2015 6:58 PM
> *To:* Ulanov, Alexander
> *Cc:* user@spark.apache.org
> *Subject:* Re: Scalability of group by
>
>
>
> Hi
>
> Can you test on a smaller dataset to identify if it is cluster issue or
> scaling issue in spark
>
> On 28 Apr 2015 11:30, "Ulanov, Alexander"  wrote:
>
>  Hi,
>
>
>
> I am running a group by on a dataset of 2B of RDD[Row [id, time, value]]
> in Spark 1.3 as follows:
>
> “select id, time, first(value) from data group by id, time”
>
>
>
> My cluster is 8 nodes with 16GB RAM and one worker per node. Each executor
> is allocated with 5GB of memory. However, all executors are being lost
> during the query execution and I get “ExecutorLostFailure”.
>
>
>
> Could you suggest what might be the reason for it? Could it be that “group
> by” is implemented as RDD.groupBy so it holds the group by result in
> memory? What is the workaround?
>
>
>
> Best regards, Alexander
>
>


Re: Extra stage that executes before triggering computation with an action

2015-04-29 Thread Richard Marscher
I'm not sure, but I wonder if because you are using the Spark REPL that it
may not be representing what a normal runtime execution would look like and
is possibly eagerly running a partial DAG once you define an operation that
would cause a shuffle.

What happens if you setup your same set of commands [a-e] in a file and use
the Spark REPL's `load` or `paste` command to load them all at once?

On Wed, Apr 29, 2015 at 2:55 PM, Tom Hubregtsen 
wrote:

> Thanks for the responses.
>
> "Try removing toDebugString and see what happens. "
>
> The toDebugString is performed after [d] (the action), as [e]. By then all
> stages are already executed.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Extra-stage-that-executes-before-triggering-computation-with-an-action-tp22707p22712.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-05-04 Thread Richard Marscher
In regards to the large GC pauses, assuming you allocated all 100GB of
memory per worker you may consider running with less memory on your Worker
nodes, or splitting up the available memory on the Worker nodes amongst
several worker instances. The JVM's garbage collection starts to become
very slow as the memory allocation for the heap becomes large. At 100GB it
may not be unusual to see GC take minutes at time. I believe with Yarn or
Standalone clusters you should be able to run multiple smaller JVM
instances on your workers so you can still use your cluster resources but
also won't have a single JVM allocating an unwieldy amount of much memory.

On Mon, May 4, 2015 at 2:23 AM, Nick Travers  wrote:

> Could you be more specific in how this is done?
>
> A DataFrame class doesn't have that method.
>
> On Sun, May 3, 2015 at 11:07 PM, ayan guha  wrote:
>
>> You can use custom partitioner to redistribution using partitionby
>> On 4 May 2015 15:37, "Nick Travers"  wrote:
>>
>>> I'm currently trying to join two large tables (order 1B rows each) using
>>> Spark SQL (1.3.0) and am running into long GC pauses which bring the job
>>> to
>>> a halt.
>>>
>>> I'm reading in both tables using a HiveContext with the underlying files
>>> stored as Parquet Files. I'm using  something along the lines of
>>> HiveContext.sql("SELECT a.col1, b.col2 FROM a JOIN b ON a.col1 =
>>> b.col1") to
>>> set up the join.
>>>
>>> When I execute this (with an action such as .count) I see the first few
>>> stages complete, but the job eventually stalls. The GC counts keep
>>> increasing for each executor.
>>>
>>> Running with 6 workers, each with 2T disk and 100GB RAM.
>>>
>>> Has anyone else run into this issue? I'm thinking I might be running into
>>> issues with the shuffling of the data, but I'm unsure of how to get
>>> around
>>> this? Is there a way to redistribute the rows based on the join key
>>> first,
>>> and then do the join?
>>>
>>> Thanks in advance.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Dataset Outer Join vs RDD Outer Join

2016-06-01 Thread Richard Marscher
Hi,

I've been working on transitioning from RDD to Datasets in our codebase in
anticipation of being able to leverage features of 2.0.

I'm having a lot of difficulties with the impedance mismatches between how
outer joins worked with RDD versus Dataset. The Dataset joins feel like a
big step backwards IMO. With RDD, leftOuterJoin would give you Option types
of the results from the right side of the join. This follows idiomatic
Scala avoiding nulls and was easy to work with.

Now with Dataset there is only joinWith where you specify the join type,
but it lost all the semantics of identifying missing data from outer joins.
I can write some enriched methods on Dataset with an implicit class to
abstract messiness away if Dataset nulled out all mismatching data from an
outer join, however the problem goes even further in that the values aren't
always null. Integer, for example, defaults to -1 instead of null. Now it's
completely ambiguous what data in the join was actually there versus
populated via this atypical semantic.

Are there additional options available to work around this issue? I can
convert to RDD and back to Dataset but that's less than ideal.

Thanks,
-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Dataset Outer Join vs RDD Outer Join

2016-06-01 Thread Richard Marscher
Ah thanks, I missed seeing the PR for
https://issues.apache.org/jira/browse/SPARK-15441. If the rows became null
objects then I can implement methods that will map those back to results
that align closer to the RDD interface.

As a follow on, I'm curious about thoughts regarding enriching the Dataset
join interface versus a package or users sugaring for themselves. I haven't
considered the implications of what the optimizations datasets, tungsten,
and/or bytecode gen can do now regarding joins so I may be missing a
critical benefit there around say avoiding Options in favor of nulls. If
nothing else, I guess Option doesn't have a first class Encoder or DataType
yet and maybe for good reasons.

I did find the RDD join interface elegant, though. In the ideal world an
API comparable the following would be nice:
https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06


On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust 
wrote:

> Thanks for the feedback.  I think this will address at least some of the
> problems you are describing: https://github.com/apache/spark/pull/13425
>
> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher  > wrote:
>
>> Hi,
>>
>> I've been working on transitioning from RDD to Datasets in our codebase
>> in anticipation of being able to leverage features of 2.0.
>>
>> I'm having a lot of difficulties with the impedance mismatches between
>> how outer joins worked with RDD versus Dataset. The Dataset joins feel like
>> a big step backwards IMO. With RDD, leftOuterJoin would give you Option
>> types of the results from the right side of the join. This follows
>> idiomatic Scala avoiding nulls and was easy to work with.
>>
>> Now with Dataset there is only joinWith where you specify the join type,
>> but it lost all the semantics of identifying missing data from outer joins.
>> I can write some enriched methods on Dataset with an implicit class to
>> abstract messiness away if Dataset nulled out all mismatching data from an
>> outer join, however the problem goes even further in that the values aren't
>> always null. Integer, for example, defaults to -1 instead of null. Now it's
>> completely ambiguous what data in the join was actually there versus
>> populated via this atypical semantic.
>>
>> Are there additional options available to work around this issue? I can
>> convert to RDD and back to Dataset but that's less than ideal.
>>
>> Thanks,
>> --
>> *Richard Marscher*
>> Senior Software Engineer
>> Localytics
>> Localytics.com <http://localytics.com/> | Our Blog
>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
>> Facebook <http://facebook.com/localytics> | LinkedIn
>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>
>
>


-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Dataset Outer Join vs RDD Outer Join

2016-06-06 Thread Richard Marscher
A quick unit test attempt didn't get far replacing map with as[], I'm only
working against 1.6.1 at the moment though, I was going to try 2.0 but I'm
having a hard time building a working spark-sql jar from source, the only
ones I've managed to make are intended for the full assembly fat jar.


Example of the error from calling joinWith as left_outer and then
.as[(Option[T], U]) where T and U are Int and Int.

[info] newinstance(class scala.Tuple2,decodeusingserializer(input[0,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class
scala.Tuple2),None)
[info] :- decodeusingserializer(input[0,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true)
[info] :  +- input[0, StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))]
[info] +- decodeusingserializer(input[1,
StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))],scala.Option,true)
[info]+- input[1, StructType(StructField(_1,IntegerType,true),
StructField(_2,IntegerType,true))]

Cause: java.util.concurrent.ExecutionException: java.lang.Exception: failed
to compile: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 32, Column 60: No applicable constructor/method
found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
candidates are: "public static java.nio.ByteBuffer
java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
java.nio.ByteBuffer.wrap(byte[], int, int)"

The generated code is passing InternalRow objects into the ByteBuffer

Starting from two Datasets of types Dataset[(Int, Int)] with expression
$"left._1" === $"right._1". I'll have to spend some time getting a better
understanding of this analysis phase, but hopefully I can come up with
something.

On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust 
wrote:

> Option should place nicely with encoders, but its always possible there
> are bugs.  I think those function signatures are slightly more expensive
> (one extra object allocation) and its not as java friendly so we probably
> don't want them to be the default.
>
> That said, I would like to enable that kind of sugar while still taking
> advantage of all the optimizations going on under the covers.  Can you get
> it to work if you use `as[...]` instead of `map`?
>
> On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher <
> rmarsc...@localytics.com> wrote:
>
>> Ah thanks, I missed seeing the PR for
>> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became
>> null objects then I can implement methods that will map those back to
>> results that align closer to the RDD interface.
>>
>> As a follow on, I'm curious about thoughts regarding enriching the
>> Dataset join interface versus a package or users sugaring for themselves. I
>> haven't considered the implications of what the optimizations datasets,
>> tungsten, and/or bytecode gen can do now regarding joins so I may be
>> missing a critical benefit there around say avoiding Options in favor of
>> nulls. If nothing else, I guess Option doesn't have a first class Encoder
>> or DataType yet and maybe for good reasons.
>>
>> I did find the RDD join interface elegant, though. In the ideal world an
>> API comparable the following would be nice:
>> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06
>>
>>
>> On Wed, Jun 1, 2016 at 1:42 PM, Michael Armbrust 
>> wrote:
>>
>>> Thanks for the feedback.  I think this will address at least some of the
>>> problems you are describing: https://github.com/apache/spark/pull/13425
>>>
>>> On Wed, Jun 1, 2016 at 9:58 AM, Richard Marscher <
>>> rmarsc...@localytics.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've been working on transitioning from RDD to Datasets in our codebase
>>>> in anticipation of being able to leverage features of 2.0.
>>>>
>>>> I'm having a lot of difficulties with the impedance mismatches between
>>>> how outer joins worked with RDD versus Dataset. The Dataset joins feel like
>>>> a big step backwards IMO. With RDD, leftOuterJoin would give you Option
>>>> types of the results from the right side of the join. This follows
>>>> idiomatic Scala avoiding nulls and was easy to work with.
>>>>
>>>> Now with Dataset there is only joinWith where you specify the join
>>>> type, but it los

Re: Dataset Outer Join vs RDD Outer Join

2016-06-07 Thread Richard Marscher
For anyone following along the chain went private for a bit, but there were
still issues with the bytecode generation in the 2.0-preview so this JIRA
was created: https://issues.apache.org/jira/browse/SPARK-15786

On Mon, Jun 6, 2016 at 1:11 PM, Michael Armbrust 
wrote:

> That kind of stuff is likely fixed in 2.0.  If you can get a reproduction
> working there it would be very helpful if you could open a JIRA.
>
> On Mon, Jun 6, 2016 at 7:37 AM, Richard Marscher  > wrote:
>
>> A quick unit test attempt didn't get far replacing map with as[], I'm
>> only working against 1.6.1 at the moment though, I was going to try 2.0 but
>> I'm having a hard time building a working spark-sql jar from source, the
>> only ones I've managed to make are intended for the full assembly fat jar.
>>
>>
>> Example of the error from calling joinWith as left_outer and then
>> .as[(Option[T], U]) where T and U are Int and Int.
>>
>> [info] newinstance(class scala.Tuple2,decodeusingserializer(input[0,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true),decodeusingserializer(input[1,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true),false,ObjectType(class
>> scala.Tuple2),None)
>> [info] :- decodeusingserializer(input[0,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true)
>> [info] :  +- input[0, StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))]
>> [info] +- decodeusingserializer(input[1,
>> StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))],scala.Option,true)
>> [info]+- input[1, StructType(StructField(_1,IntegerType,true),
>> StructField(_2,IntegerType,true))]
>>
>> Cause: java.util.concurrent.ExecutionException: java.lang.Exception:
>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 32, Column 60: No applicable constructor/method
>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>> candidates are: "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>>
>> The generated code is passing InternalRow objects into the ByteBuffer
>>
>> Starting from two Datasets of types Dataset[(Int, Int)] with expression
>> $"left._1" === $"right._1". I'll have to spend some time getting a better
>> understanding of this analysis phase, but hopefully I can come up with
>> something.
>>
>> On Wed, Jun 1, 2016 at 3:43 PM, Michael Armbrust 
>> wrote:
>>
>>> Option should place nicely with encoders, but its always possible there
>>> are bugs.  I think those function signatures are slightly more expensive
>>> (one extra object allocation) and its not as java friendly so we probably
>>> don't want them to be the default.
>>>
>>> That said, I would like to enable that kind of sugar while still taking
>>> advantage of all the optimizations going on under the covers.  Can you get
>>> it to work if you use `as[...]` instead of `map`?
>>>
>>> On Wed, Jun 1, 2016 at 11:59 AM, Richard Marscher <
>>> rmarsc...@localytics.com> wrote:
>>>
>>>> Ah thanks, I missed seeing the PR for
>>>> https://issues.apache.org/jira/browse/SPARK-15441. If the rows became
>>>> null objects then I can implement methods that will map those back to
>>>> results that align closer to the RDD interface.
>>>>
>>>> As a follow on, I'm curious about thoughts regarding enriching the
>>>> Dataset join interface versus a package or users sugaring for themselves. I
>>>> haven't considered the implications of what the optimizations datasets,
>>>> tungsten, and/or bytecode gen can do now regarding joins so I may be
>>>> missing a critical benefit there around say avoiding Options in favor of
>>>> nulls. If nothing else, I guess Option doesn't have a first class Encoder
>>>> or DataType yet and maybe for good reasons.
>>>>
>>>> I did find the RDD join interface elegant, though. In the ideal world
>>>> an API comparable the following would be nice:
>>>> https://gist.github.com/rmarsch/3ea78b3a9a8a0e83ce162ed947fcab06
>>>>
>>>>
>>>> On Wed, Jun 1, 2016 at 1:42 PM, Michael Armb

Re: Dataset - reduceByKey

2016-06-07 Thread Richard Marscher
There certainly are some gaps between the richness of the RDD API and the
Dataset API. I'm also migrating from RDD to Dataset and ran into
reduceByKey and join scenarios.

In the spark-dev list, one person was discussing reduceByKey being
sub-optimal at the moment and it spawned this JIRA
https://issues.apache.org/jira/browse/SPARK-15598. But you might be able to
get by with groupBy().reduce() for now, check performance though.

As for join, the approach would be using the joinWith function on Dataset.
Although the API isn't as sugary as it was for RDD IMO, something which
I've been discussing in a separate thread as well. I can't find a weblink
for it but the thread subject is "Dataset Outer Join vs RDD Outer Join".

On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey 
wrote:

> It would also be nice if there was a better example of joining two
> Datasets. I am looking at the documentation here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
> a little bit sparse - is there a better documentation source?
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey 
> wrote:
>
>> Hello.
>>
>> I am looking at the option of moving RDD based operations to Dataset
>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>> What would the equivalent be in the Dataset interface - I do not see a
>> simple reduceByKey replacement.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>


-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: user threads in executors

2015-07-21 Thread Richard Marscher
You can certainly create threads in a map transformation. We do this to do
concurrent DB lookups during one stage for example. I would recommend,
however, that you switch to mapPartitions from map as this allows you to
create a fixed size thread pool to share across items on a partition as
opposed to spawning a future per record in the RDD for example.

On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora 
wrote:

> Hi
>
> Can I create user threads in executors.
> I have a streaming app where after processing I have a requirement to push
> events to external system . Each post request costs ~90-100 ms.
>
> To make post parllel, I can not use same thread because that is limited by
> no of cores available in system , can I useuser therads in spark App? I
> tried to create 2 thredas in a map tasks and it worked.
>
> Is there any upper limit on no of user threds in spark executor ? Is it a
> good idea to create user threads in spark map task?
>
> Thanks
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Richard Marscher
Yes it does, in fact it's probably going to be one of the more expensive
shuffles you could trigger.

On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu 
wrote:

> Does RDD.cartesian involve shuffling?
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: How to increase parallelism of a Spark cluster?

2015-08-04 Thread Richard Marscher
 in multiple threads. This
>>> has the benefit of isolating applications from each other, on both the
>>> scheduling side (each driver schedules its own tasks) and executor side
>>> (tasks from different applications run in different JVMs).
>>
>>
>> Regarding hitting the max number of requests, thanks for the link. I am
>> using the default client. Just peeked at the Solr code, and the default
>> settings (if no HttpClient instance is supplied in the ctor) is to use
>> DefaultHttpClient (from HttpComponents) whose settings are as follows:
>>
>>>
>>>- Version: HttpVersion.HTTP_1_1
>>>
>>>
>>>- ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET
>>>
>>>
>>>- NoTcpDelay: true
>>>
>>>
>>>- SocketBufferSize: 8192
>>>
>>>
>>>- UserAgent: Apache-HttpClient/release (java 1.5)
>>>
>>> In addition, the Solr code sets the following additional config
>> parameters on the DefaultHttpClient.
>>
>>   params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128);
>>>   params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32);
>>>   params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects);
>>
>> Since all my connections are coming out of 2 worker boxes, it looks like
>> I could get 32x2 = 64 clients hitting Solr, right?
>>
>> @Steve: Thanks for the link to the HttpClient config. I was thinking
>> about using a thread pool (or better using a PoolingHttpClientManager per
>> the docs), but it probably won't help since its still being fed one request
>> at a time.
>> @Abhishek: my observations agree with what you said. In the past I have
>> had success with repartition to reduce the partition size especially when
>> groupBy operations were involved. But I believe an executor should be able
>> to handle multiple tasks in parallel from what I understand about Akka on
>> which Spark is built - the worker is essentially an ActorSystem which can
>> contain multiple Actors, each actor works on a queue of tasks. Within an
>> Actor everything is sequential, but the ActorSystem is responsible for
>> farming out tasks it gets to each of its Actors. Although it is possible I
>> could be generalizing incorrectly from my limited experience with Akka.
>>
>> Thanks again for all your help. Please let me know if something jumps out
>> and/or if there is some configuration I should check.
>>
>> -sujit
>>
>>
>>
>> On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh <
>> abhis...@tetrationanalytics.com> wrote:
>>
>>> I don't know if (your assertion/expectation that) workers will process
>>> things (multiple partitions) in parallel is really valid. Or if having more
>>> partitions than workers will necessarily help (unless you are memory bound
>>> - so partitions is essentially helping your work size rather than execution
>>> parallelism).
>>>
>>> [Disclaimer: I am no authority on Spark, but wanted to throw my spin
>>> based my own understanding].
>>>
>>> Nothing official about it :)
>>>
>>> -abhishek-
>>>
>>> On Jul 31, 2015, at 1:03 PM, Sujit Pal  wrote:
>>>
>>> Hello,
>>>
>>> I am trying to run a Spark job that hits an external webservice to get
>>> back some information. The cluster is 1 master + 4 workers, each worker has
>>> 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
>>> and is accessed using code similar to that shown below.
>>>
>>> def getResults(keyValues: Iterator[(String, Array[String])]):
>>>> Iterator[(String, String)] = {
>>>> val solr = new HttpSolrClient()
>>>> initializeSolrParameters(solr)
>>>> keyValues.map(keyValue => (keyValue._1, process(solr, keyValue)))
>>>> }
>>>> myRDD.repartition(10)
>>>
>>>  .mapPartitions(keyValues => getResults(keyValues))
>>>>
>>>
>>> The mapPartitions does some initialization to the SolrJ client per
>>> partition and then hits it for each record in the partition via the
>>> getResults() call.
>>>
>>> I repartitioned in the hope that this will result in 10 clients hitting
>>> Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
>>> clients if I can). However, I counted the number of open connections using
>>> "netstat -anp | grep ":8983.*ESTABLISHED" in a loop on the Solr box and
>>> observed that Solr has a constant 4 clients (ie, equal to the number of
>>> workers) over the lifetime of the run.
>>>
>>> My observation leads me to believe that each worker processes a single
>>> stream of work sequentially. However, from what I understand about how
>>> Spark works, each worker should be able to process number of tasks
>>> parallelly, and that repartition() is a hint for it to do so.
>>>
>>> Is there some SparkConf environment variable I should set to increase
>>> parallelism in these workers, or should I just configure a cluster with
>>> multiple workers per machine? Or is there something I am doing wrong?
>>>
>>> Thank you in advance for any pointers you can provide.
>>>
>>> -sujit
>>>
>>>
>>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Does RDD.cartesian involve shuffling?

2015-08-04 Thread Richard Marscher
That is the only alternative I'm aware of, if either A or B are small
enough to broadcast then you'd at least be done cartesian products all
locally without needing to also transmit and shuffle A. Unless spark
somehow optimizes cartesian product and only transfers the smaller RDD
across the network in the shuffle but I don't have reason to believe that's
true.

I'd try the cartesian first if you haven't tried at all, just to make sure
it actually is too slow before getting tricky with the broadcast.

On Tue, Aug 4, 2015 at 12:25 PM, Meihua Wu 
wrote:

> Thanks, Richard!
>
> I basically have two RDD's: A and B; and I need to compute a value for
> every pair of (a, b) for a in A and b in B. My first thought is
> cartesian, but involves expensive shuffle.
>
> Any alternatives? How about I convert B to an array and broadcast it
> to every node (assuming B is relative small to fit)?
>
>
>
> On Tue, Aug 4, 2015 at 8:23 AM, Richard Marscher
>  wrote:
> > Yes it does, in fact it's probably going to be one of the more expensive
> > shuffles you could trigger.
> >
> > On Mon, Aug 3, 2015 at 12:56 PM, Meihua Wu  >
> > wrote:
> >>
> >> Does RDD.cartesian involve shuffling?
> >>
> >> Thanks!
> >>
> >> -----
> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> For additional commands, e-mail: user-h...@spark.apache.org
> >>
> >
> >
> >
> > --
> > Richard Marscher
> > Software Engineer
> > Localytics
> > Localytics.com | Our Blog | Twitter | Facebook | LinkedIn
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Repartition question

2015-08-04 Thread Richard Marscher
Hi,

it is possible to control the number of partitions for the RDD without
calling repartition by setting the max split size for the hadoop input
format used. Tracing through the code, XmlInputFormat extends
FileInputFormat which determines the number of splits (which NewHadoopRdd
uses to determine number of partitions:
https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L95)
with a few configs:
https://github.com/apache/hadoop/blob/branch-2.3/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java#L200
.

public static final String SPLIT_MAXSIZE =
>
>
> "mapreduce.input.fileinputformat.split.maxsize";
>
>
> public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
If you are setting SparkConf fields, prefix the keys with spark.hadoop and
they will end up on the Hadoop conf used for the above values.

On Tue, Aug 4, 2015 at 12:31 AM, Naveen Madhire 
wrote:

> Hi All,
>
> I am running the WikiPedia parsing example present in the "Advance
> Analytics with Spark" book.
>
>
> https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112
>
>
> The partitions of the RDD returned by the readFile function (mentioned
> above) is of 32MB size. So if my file size is 100 MB, RDD is getting
> created with 4 partitions with approx 32MB  size.
>
>
> I am running this in a standalone spark cluster mode, every thing is
> working fine only little confused about the nbr of partitions and the size.
>
> I want to increase the nbr of partitions for the RDD to make use of the
> cluster. Is calling repartition() after this the only option or can I pass
> something in the above method to have more partitions of the RDD.
>
> Please let me know.
>
> Thanks.
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Richard Marscher
Not that I'm aware of. We ran into the similar "issue" where we didn't want
to keep accumulating all these empty part files in storage on S3 or HDFS.
There didn't seem to be any performance free way to do it with an RDD, so
we just run a non-spark post-batch operation to delete empty files from the
write path.

On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin 
wrote:

> Currently, I use rdd.isEmpty()
>
> Thanks,
> Patanachai
>
>
>
> On 08/06/2015 12:02 PM, gpatcham wrote:
>
>> Is there a way to filter out empty partitions before I write to HDFS other
>> than using reparition and colasce ?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
> --
> Patanachai
>
>
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-08 Thread Richard Marscher
Hi,

what is the reasoning behind the use of `coalesce(1,false)`? This is saying
to aggregate all data into a single partition, which must fit in memory on
one node in the Spark cluster. If the cluster has more than one node it
must shuffle to move the data. It doesn't seem like the following map or
union necessitate coalesce, but the use case is not clear to me.

On Fri, Sep 4, 2015 at 12:29 PM, unk1102  wrote:

> Hi I have Spark job which does some processing on ORC data and stores back
> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I have
> the following piece of code which is using heavy shuffle memory. How do I
> optimize below code? Is there anything wrong with it? It is working fine as
> expected only causing slowness because of GC pause and shuffles lots of
> data
> so hitting memory issues. Please guide I am new to Spark. Thanks in
> advance.
>
> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
> false).map(new Function() {
>@Override
>public Row call(Row row) throws Exception {
> List rowAsList;
> Row row1 = null;
> if (row != null) {
>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>   row1 = RowFactory.create(rowAsList.toArray());
> }
> return row1;
>}
> }).union(modifiedRDD);
> DataFrame updatedDataFrame =
> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>
> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
> "date").save("baseTable");
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: New to Spark - Paritioning Question

2015-09-08 Thread Richard Marscher
That seems like it could work, although I don't think `partitionByKey` is a
thing, at least for RDD. You might be able to merge step #2 and step #3
into one step by using the `reduceByKey` function signature that takes in a
Partitioner implementation.

def reduceByKey(partitioner: Partitioner
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html>
, func: (V, V) ⇒ V): RDD
<http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>
[(K, V)]

Merge the values for each key using an associative reduce function. This
will also perform the merging locally on each mapper before sending results
to a reducer, similarly to a "combiner" in MapReduce.

The tricky part might be getting the partitioner to know about the number
of partitions, which I think it needs to know upfront in `abstract def
numPartitions: Int`. The `HashPartitioner` for example takes in the number
as a constructor argument, maybe you could use that with an upper bound
size if you don't mind empty partitions. Otherwise you might have to mess
around to extract the exact number of keys if it's not readily available.

Aside: what is the requirement to have each partition only contain the data
related to one key?

On Fri, Sep 4, 2015 at 11:06 AM, mmike87  wrote:

> Hello, I am new to Apache Spark and this is my company's first Spark
> project.
> Essentially, we are calculating models dealing with Mining data using
> Spark.
>
> I am holding all the source data in a persisted RDD that we will refresh
> periodically. When a "scenario" is passed to the Spark job (we're using Job
> Server) the persisted RDD is filtered to the relevant mines. For example,
> we
> may want all mines in Chile and the 1990-2015 data for each.
>
> Many of the calculations are cumulative, that is when we apply user-input
> "adjustment factors" to a value, we also need the "flexed" value we
> calculated for that mine previously.
>
> To ensure that this works, the idea if to:
>
> 1) Filter the superset to relevant mines (done)
> 2) Group the subset by the unique identifier for the mine. So, a group may
> be all the rows for mine "A" for 1990-2015
> 3) I then want to ensure that the RDD is partitioned by the Mine Identifier
> (and Integer).
>
> It's step 3 that is confusing me. I suspect it's very easy ... do I simply
> use PartitionByKey?
>
> We're using Java if that makes any difference.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/New-to-Spark-Paritioning-Question-tp24580.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: New to Spark - Paritioning Question

2015-09-09 Thread Richard Marscher
Ah I see. In that case, the groupByKey function does guarantee every key is
on exactly one partition matched with the aggregated data. This can be
improved depending on what you want to do after. Group by key only
aggregates the data after shipping it across the cluster. Meanwhile, using
reduceByKey will do aggregation on each node first, then ship those results
to the final node and partition to finalize the aggregation there. If that
makes sense.

So say Node 1 has pairs: (a, 1), (b, 2), (b, 6)
Node 2 has pairs: (a, 2), (a,3), (b, 4)

group by would say send both a pair and b pairs across the network. If you
did reduce with the aggregate of sum then you'd expect it to ship (b, 8)
from Node 1 or (a, 5) from Node 2 since it did the local aggregation first.

You are correct that doing something with expensive side-effects like
writing to a database (connections and network + I/O) is best done with the
mapPartitions or foreachPartition type of functions on RDD so you can share
a database connection and also potentially do things like batch statements.


On Tue, Sep 8, 2015 at 7:37 PM, Mike Wright  wrote:

> Thanks for the response!
>
> Well, in retrospect each partition doesn't need to be restricted to a
> single key. But, I cannot have values associated with a key span partitions
> since they all need to be processed together for a key to facilitate
> cumulative calcs. So provided an individual key has all its values in a
> single partition, I'm OK.
>
> Additionally, the values will be written to the database, and from what I
> have read doing this at the partition level is the best compromise between
> 1) Writing the calculated values for each key (lots of connect/disconnects)
> and collecting them all at the end and writing them all at once.
>
> I am using a groupBy against the filtered RDD the get the grouping I want,
> but apparently this may not be the most efficient way, and it seems that
> everything is always in a single partition under this scenario.
>
>
> ___
>
> *Mike Wright*
> Principal Architect, Software Engineering
>
> SNL Financial LC
> 434-951-7816 *p*
> 434-244-4466 *f*
> 540-470-0119 *m*
>
> mwri...@snl.com
>
> On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher  > wrote:
>
>> That seems like it could work, although I don't think `partitionByKey` is
>> a thing, at least for RDD. You might be able to merge step #2 and step #3
>> into one step by using the `reduceByKey` function signature that takes in a
>> Partitioner implementation.
>>
>> def reduceByKey(partitioner: Partitioner
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/Partitioner.html>
>> , func: (V, V) ⇒ V): RDD
>> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>
>> [(K, V)]
>>
>> Merge the values for each key using an associative reduce function. This
>> will also perform the merging locally on each mapper before sending results
>> to a reducer, similarly to a "combiner" in MapReduce.
>>
>> The tricky part might be getting the partitioner to know about the number
>> of partitions, which I think it needs to know upfront in `abstract def
>> numPartitions: Int`. The `HashPartitioner` for example takes in the
>> number as a constructor argument, maybe you could use that with an upper
>> bound size if you don't mind empty partitions. Otherwise you might have to
>> mess around to extract the exact number of keys if it's not readily
>> available.
>>
>> Aside: what is the requirement to have each partition only contain the
>> data related to one key?
>>
>> On Fri, Sep 4, 2015 at 11:06 AM, mmike87  wrote:
>>
>>> Hello, I am new to Apache Spark and this is my company's first Spark
>>> project.
>>> Essentially, we are calculating models dealing with Mining data using
>>> Spark.
>>>
>>> I am holding all the source data in a persisted RDD that we will refresh
>>> periodically. When a "scenario" is passed to the Spark job (we're using
>>> Job
>>> Server) the persisted RDD is filtered to the relevant mines. For
>>> example, we
>>> may want all mines in Chile and the 1990-2015 data for each.
>>>
>>> Many of the calculations are cumulative, that is when we apply user-input
>>> "adjustment factors" to a value, we also need the "flexed" value we
>>> calculated for that mine previously.
>>>
>>> To ensure that this works, the idea if to:
>>>
>>> 1) Filter the superset to relevant mines (done)
>>> 2) Group the subset by the unique identifier for the mine. So, a gro

Re: What should be the optimal value for spark.sql.shuffle.partition?

2015-09-09 Thread Richard Marscher
Do you have any details about the cluster you are running this against? The
memory per executor/node, number of executors, and such? Even at a shuffle
setting of 1000 that would be roughly 1GB per partition assuming the 1TB of
data includes overheads in the JVM. Maybe try another order of magnitude
higher for number of shuffle partitions and see where that gets you?

On Tue, Sep 1, 2015 at 12:11 PM, unk1102  wrote:

> Hi I am using Spark SQL actually hiveContext.sql() which uses group by
> queries and I am running into OOM issues. So thinking of increasing value
> of
> spark.sql.shuffle.partition from 200 default to 1000 but it is not helping.
> Please correct me if I am wrong this partitions will share data shuffle
> load
> so more the partitions less data to hold. Please guide I am new to Spark. I
> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process
> using hiveContext.sql() group by queries.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: spark.shuffle.spill=false ignored?

2015-09-09 Thread Richard Marscher
Hi Eric,

I just wanted to do a sanity check, do you know what paths it is trying to
write to? I ask because even without spilling, shuffles always write to
disk first before transferring data across the network. I had at one point
encountered this myself where we accidentally had /tmp mounted on a tiny
disk and kept running out of disk on shuffles even though we also don't
spill. You may have already considered or ruled this out though.

On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker  wrote:

> Hi,
>
> I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to
> run a large pyspark job several times, specifying
> `spark.shuffle.spill=false` in different ways.  It seems that the setting
> is ignored, at least partially, and some of the tasks start spilling large
> amounts of data to disk.  The job has been fast enough in the past, but
> once it starts spilling to disk it lands on Miller's planet [1].
>
> Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
> could there be an incompatible setting that is overriding
> `spark.shuffle.spill=false`?  Is it something that goes back to Spark
> 1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
> to continue on for a while, I've started to see Kryo stack traces in the
> tasks that are spilling to disk.  The stack traces mention there not being
> enough disk space, although a `df` shows plenty of space (perhaps after the
> fact, when temporary files have been cleaned up).
>
> Has anyone run into something like this before?  I would be happy to see
> OOM errors, because that would be consistent with one understanding of what
> might be going on, but I haven't yet.
>
> Eric
>
>
> [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk&safe=active
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: What should be the optimal value for spark.sql.shuffle.partition?

2015-09-09 Thread Richard Marscher
I see you reposted with more details:

"I have 2 TB of
skewed data to process and then convert rdd into dataframe and use it as
table in hiveContext.sql(). I am using 60 executors with 20 GB memory and 4
cores."

If I'm reading that correctly, you have 2TB of data and 1.2TB of memory in
the cluster. I think that's a fundamental problem up front. If it's skewed
then that will be even worse for doing aggregation. I think to start the
data either needs to be broken down or the cluster upgraded unfortunately.

On Wed, Sep 9, 2015 at 5:41 PM, Richard Marscher 
wrote:

> Do you have any details about the cluster you are running this against?
> The memory per executor/node, number of executors, and such? Even at a
> shuffle setting of 1000 that would be roughly 1GB per partition assuming
> the 1TB of data includes overheads in the JVM. Maybe try another order of
> magnitude higher for number of shuffle partitions and see where that gets
> you?
>
> On Tue, Sep 1, 2015 at 12:11 PM, unk1102  wrote:
>
>> Hi I am using Spark SQL actually hiveContext.sql() which uses group by
>> queries and I am running into OOM issues. So thinking of increasing value
>> of
>> spark.sql.shuffle.partition from 200 default to 1000 but it is not
>> helping.
>> Please correct me if I am wrong this partitions will share data shuffle
>> load
>> so more the partitions less data to hold. Please guide I am new to Spark.
>> I
>> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process
>> using hiveContext.sql() group by queries.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -----
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Option Encoder

2016-06-23 Thread Richard Marscher
Is there a proper way to make or get an Encoder for Option in Spark 2.0?
There isn't one by default and while ExpressionEncoder from catalyst will
work, it is private and unsupported.

-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Local mode: Stages hang for minutes

2015-12-03 Thread Richard Marscher
Hi,

I'm doing some testing of workloads using local mode on a server. I see
weird behavior where a job is submitted to the application and it just
hangs for several minutes doing nothing. The stages are submitted as
pending and in the application UI the stage view claims no tasks have been
submitted. Suddenly after a few minutes things suddenly start and run
smoothly.

I'm running against tiny data sets the size of 10s to low 100s of items in
the RDD. I've been attaching with JProfiler, doing thread and heap dumps
but nothing is really standing out as to why Spark seems to periodically
pause for such a long time.

Has anyone else seen similar behavior or aware of some quirk of local mode
that could cause this kind of blocking?

-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local mode: Stages hang for minutes

2015-12-03 Thread Richard Marscher
I should add that the pauses are not from GC and also in tracing the CPU
call tree in the JVM it seems like nothing is doing any work, just seems to
be idling or blocking.

On Thu, Dec 3, 2015 at 11:24 AM, Richard Marscher 
wrote:

> Hi,
>
> I'm doing some testing of workloads using local mode on a server. I see
> weird behavior where a job is submitted to the application and it just
> hangs for several minutes doing nothing. The stages are submitted as
> pending and in the application UI the stage view claims no tasks have been
> submitted. Suddenly after a few minutes things suddenly start and run
> smoothly.
>
> I'm running against tiny data sets the size of 10s to low 100s of items in
> the RDD. I've been attaching with JProfiler, doing thread and heap dumps
> but nothing is really standing out as to why Spark seems to periodically
> pause for such a long time.
>
> Has anyone else seen similar behavior or aware of some quirk of local mode
> that could cause this kind of blocking?
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local mode: Stages hang for minutes

2015-12-03 Thread Richard Marscher
Ended up realizing I was only looking at the call tree for running threads.
After looking at blocking threads I saw that it was spending hundreds of
compute hours blocking on jets3t calls to S3. Realized it was looking over
likely thousands if not hundreds of thousands of S3 files accumulated over
many rounds of load testing. Cleaning the files fixed the issue and I'm
pretty sure it's already well known that the underlying s3n doesn't handle
traversing a large s3 file tree with the sparkContext.textFile function
using wildcard well.

On Thu, Dec 3, 2015 at 12:57 PM, Ali Tajeldin EDU 
wrote:

> You can try to run "jstack" a couple of times while the app is hung to
> look for patterns  for where the app is hung.
> --
> Ali
>
>
> On Dec 3, 2015, at 8:27 AM, Richard Marscher 
> wrote:
>
> I should add that the pauses are not from GC and also in tracing the CPU
> call tree in the JVM it seems like nothing is doing any work, just seems to
> be idling or blocking.
>
> On Thu, Dec 3, 2015 at 11:24 AM, Richard Marscher <
> rmarsc...@localytics.com> wrote:
>
>> Hi,
>>
>> I'm doing some testing of workloads using local mode on a server. I see
>> weird behavior where a job is submitted to the application and it just
>> hangs for several minutes doing nothing. The stages are submitted as
>> pending and in the application UI the stage view claims no tasks have been
>> submitted. Suddenly after a few minutes things suddenly start and run
>> smoothly.
>>
>> I'm running against tiny data sets the size of 10s to low 100s of items
>> in the RDD. I've been attaching with JProfiler, doing thread and heap dumps
>> but nothing is really standing out as to why Spark seems to periodically
>> pause for such a long time.
>>
>> Has anyone else seen similar behavior or aware of some quirk of local
>> mode that could cause this kind of blocking?
>>
>> --
>> *Richard Marscher*
>> Software Engineer
>> Localytics
>> Localytics.com <http://localytics.com/> | Our Blog
>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
>> Facebook <http://facebook.com/localytics> | LinkedIn
>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>
>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Local Mode: Executor thread leak?

2015-12-07 Thread Richard Marscher
Hi,

I've been running benchmarks against Spark in local mode in a long running
process. I'm seeing threads leaking each time it runs a job. It doesn't
matter if I recycle SparkContext constantly or have 1 context stay alive
for the entire application lifetime.

I see a huge accumulation ongoing of "pool--thread-1" threads with the
creating thread "Executor task launch worker-xx" where x's are numbers. The
number of leaks per launch worker varies but usually 1 to a few.

Searching the Spark code the pool is created in the Executor class. It is
`.shutdown()` in the stop for the executor. I've wired up logging and also
tried shutdownNow() and awaitForTermination on the pools. Every seems okay
there for every Executor that is called with `stop()` but I'm still not
sure yet if every Executor is called as such, which I am looking into now.

What I'm curious to know is if anyone has seen a similar issue?

-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local Mode: Executor thread leak?

2015-12-07 Thread Richard Marscher
Thanks for the response.

The version is Spark 1.5.2.

Some examples of the thread names:

pool-1061-thread-1
pool-1059-thread-1
pool-1638-thread-1

There become hundreds then thousands of these stranded in WAITING.

I added logging to try to track the lifecycle of the thread pool in
Executor as mentioned before. Here is an excerpt, but every seems fine
there. Every executor that starts is also shut down and it seems like it
shuts down fine.

15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor
driver. pool shut down
java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread
pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool size =
0, active threads = 0, queued tasks = 0, completed tasks = 0]
15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor
driver. pool shut down
java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36]
15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread
pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool size =
0, active threads = 0, queued tasks = 0, completed tasks = 0]
15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor
driver. pool shut down
java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288]

Also here is an example thread dump of such a thread:

"pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on
condition [0x7f0c33c3e000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x7f10b3e8fb60> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu  wrote:

> Which version are you using? Could you post these thread names here?
>
> Best Regards,
> Shixiong Zhu
>
> 2015-12-07 14:30 GMT-08:00 Richard Marscher :
>
>> Hi,
>>
>> I've been running benchmarks against Spark in local mode in a long
>> running process. I'm seeing threads leaking each time it runs a job. It
>> doesn't matter if I recycle SparkContext constantly or have 1 context stay
>> alive for the entire application lifetime.
>>
>> I see a huge accumulation ongoing of "pool--thread-1" threads with
>> the creating thread "Executor task launch worker-xx" where x's are numbers.
>> The number of leaks per launch worker varies but usually 1 to a few.
>>
>> Searching the Spark code the pool is created in the Executor class. It is
>> `.shutdown()` in the stop for the executor. I've wired up logging and also
>> tried shutdownNow() and awaitForTermination on the pools. Every seems okay
>> there for every Executor that is called with `stop()` but I'm still not
>> sure yet if every Executor is called as such, which I am looking into now.
>>
>> What I'm curious to know is if anyone has seen a similar issue?
>>
>> --
>> *Richard Marscher*
>> Software Engineer
>> Localytics
>> Localytics.com <http://localytics.com/> | Our Blog
>> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
>> Facebook <http://facebook.com/localytics> | LinkedIn
>> <http://www.linkedin.com/company/1148792?trk=tyah>
>>
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Local Mode: Executor thread leak?

2015-12-08 Thread Richard Marscher
Alright I was able to work through the problem.

So the owning thread was one from the executor task launch worker, which at
least in local mode runs the task and the related user code of the task.
After judiciously naming every thread in the pools in the user code (with a
custom ThreadFactory) I was able to trace down the leak to a couple thread
pools that were not shut down properly by noticing the named threads
accumulating in thread dumps of the JVM process.

On Mon, Dec 7, 2015 at 6:41 PM, Richard Marscher 
wrote:

> Thanks for the response.
>
> The version is Spark 1.5.2.
>
> Some examples of the thread names:
>
> pool-1061-thread-1
> pool-1059-thread-1
> pool-1638-thread-1
>
> There become hundreds then thousands of these stranded in WAITING.
>
> I added logging to try to track the lifecycle of the thread pool in
> Executor as mentioned before. Here is an excerpt, but every seems fine
> there. Every executor that starts is also shut down and it seems like it
> shuts down fine.
>
> 15/12/07 23:30:21 WARN o.a.s.e.Executor: Threads finished in executor
> driver. pool shut down 
> java.util.concurrent.ThreadPoolExecutor@e5d036b[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1]
> 15/12/07 23:30:28 WARN o.a.s.e.Executor: Executor driver created, thread
> pool: java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Running, pool size
> = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
> 15/12/07 23:31:06 WARN o.a.s.e.Executor: Threads finished in executor
> driver. pool shut down 
> java.util.concurrent.ThreadPoolExecutor@3bc41ae3[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 36]
> 15/12/07 23:31:11 WARN o.a.s.e.Executor: Executor driver created, thread
> pool: java.util.concurrent.ThreadPoolExecutor@6e85ece4[Running, pool size
> = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
> 15/12/07 23:34:35 WARN o.a.s.e.Executor: Threads finished in executor
> driver. pool shut down 
> java.util.concurrent.ThreadPoolExecutor@6e85ece4[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 288]
>
> Also here is an example thread dump of such a thread:
>
> "pool-493-thread-1" prio=10 tid=0x7f0e60612800 nid=0x18c4 waiting on
> condition [0x7f0c33c3e000]
>java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x7f10b3e8fb60> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> On Mon, Dec 7, 2015 at 6:23 PM, Shixiong Zhu  wrote:
>
>> Which version are you using? Could you post these thread names here?
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-12-07 14:30 GMT-08:00 Richard Marscher :
>>
>>> Hi,
>>>
>>> I've been running benchmarks against Spark in local mode in a long
>>> running process. I'm seeing threads leaking each time it runs a job. It
>>> doesn't matter if I recycle SparkContext constantly or have 1 context stay
>>> alive for the entire application lifetime.
>>>
>>> I see a huge accumulation ongoing of "pool--thread-1" threads with
>>> the creating thread "Executor task launch worker-xx" where x's are numbers.
>>> The number of leaks per launch worker varies but usually 1 to a few.
>>>
>>> Searching the Spark code the pool is created in the Executor class. It
>>> is `.shutdown()` in the stop for the executor. I've wired up logging and
>>> also tried shutdownNow() and awaitForTermination on the pools. Every seems
>>> okay there for every Executor that is called with `stop()` but I'm still
>>> not sure yet if every Executor is called as such, which I am looking into
>>> now.
>>>
>>> What I'm curious to know is if anyone has seen a similar issue?
>>>
>>> --
>>> *Richard Marscher*
>>> Software Engineer
>>> Localytics
>>> Localytics.co