Re: PySpark API on top of Apache Arrow

2018-05-26 Thread Corey Nolet
Gourav & Nicholas,

THank you! It does look like the pyspark Pandas UDF is exactly what I want
and the article I read didn't mention that it used Arrow underneath. Looks
like Wes McKinney was also key part of building the Pandas UDF.

Gourav,

I totally apologize for my long and drawn out response to you. I initially
misunderstood your response. I also need to take the time to dive into the
PySpark source code- I was assuming that it was just firing up JVMs under
the hood.

Thanks again! I'll report back with findings.

On Sat, May 26, 2018 at 2:51 PM, Nicolas Paris <nipari...@gmail.com> wrote:

> hi corey
>
> not familiar with arrow, plasma. However recently read an article about
> spark on
> a standalone machine (your case). Sounds like you could take benefit of
> pyspark
> "as-is"
>
> https://databricks.com/blog/2018/05/03/benchmarking-
> apache-spark-on-a-single-node-machine.html
>
> regars,
>
> 2018-05-23 22:30 GMT+02:00 Corey Nolet <cjno...@gmail.com>:
>
>> Please forgive me if this question has been asked already.
>>
>> I'm working in Python with Arrow+Plasma+Pandas Dataframes. I'm curious if
>> anyone knows of any efforts to implement the PySpark API on top of Apache
>> Arrow directly. In my case, I'm doing data science on a machine with 288
>> cores and 1TB of ram.
>>
>> It would make life much easier if I was able to use the flexibility of
>> the PySpark API (rather than having to be tied to the operations in
>> Pandas). It seems like an implementation would be fairly straightforward
>> using the Plasma server and object_ids.
>>
>> If you have not heard of an effort underway to accomplish this, any
>> reasons why it would be a bad idea?
>>
>>
>> Thanks!
>>
>
>


PySpark API on top of Apache Arrow

2018-05-23 Thread Corey Nolet
Please forgive me if this question has been asked already.

I'm working in Python with Arrow+Plasma+Pandas Dataframes. I'm curious if
anyone knows of any efforts to implement the PySpark API on top of Apache
Arrow directly. In my case, I'm doing data science on a machine with 288
cores and 1TB of ram.

It would make life much easier if I was able to use the flexibility of the
PySpark API (rather than having to be tied to the operations in Pandas). It
seems like an implementation would be fairly straightforward using the
Plasma server and object_ids.

If you have not heard of an effort underway to accomplish this, any reasons
why it would be a bad idea?


Thanks!


Re: Using MatrixFactorizationModel as a feature extractor

2017-11-27 Thread Corey Nolet
I know that the algorithm itself is not able to extract features for a user
that it was not trained on, however, I'm trying to find a way to compare
users for similarity so that when I find a user that's really similar to
another user, I can just use the similar user's recommendations until the
other user gets worked into the model.

On Mon, Nov 27, 2017 at 3:08 PM, Corey Nolet <cjno...@gmail.com> wrote:

> I'm trying to use the MatrixFactorizationModel to, for instance, determine
> the latent factors of a user or item that were not used in the training
> data of the model. I'm not as concerned about the rating as I am with the
> latent factors for the user/item.
>
> Thanks!
>


Using MatrixFactorizationModel as a feature extractor

2017-11-27 Thread Corey Nolet
I'm trying to use the MatrixFactorizationModel to, for instance, determine
the latent factors of a user or item that were not used in the training
data of the model. I'm not as concerned about the rating as I am with the
latent factors for the user/item.

Thanks!


Re: Apache Flink

2016-04-17 Thread Corey Nolet
Peyman,

I'm sorry, I missed the comment that microbatching was a waste of time. Did
someone mention this? I know this thread got pretty long so I may have
missed it somewhere above.

My comment about Spark's microbatching being a downside is stricly in
reference to CEP. Complex CEP flows are reactive and the batched streaming
technique that Spark's architecture utilizes is not very easy for
programming real-time reactive designs. The thing is, many good streaming
engines start with just that, the streaming engine. They start at the core
with an architecture that generally promotes tuple-at-a-time. Whatever they
build on top of that is strictly just to make other use-cases easier to
implement, hence the main difference between Flink and Spark.

Storm, Esper and Infosphere Streams are three examples of this that come to
mind very quickly. All three of them are powerful tuple-at-a-time streams
processing engines under the hood and all 3 of them also have abstractions
 built on top of that core that make it easier to implement more specific
and more batch processing paradigms. Flink is similar to this.

I hope you didn't take my comment as an attack that Spark's microbatching
does not follow a traditional design at it's core as most well-accepted
streams processing framework have in the past. I am not implying that
microbatching is not useful in some use cases. What I am implying is that
it does not make real-time reactive environments very easy to implement.



On Sun, Apr 17, 2016 at 8:49 PM, Peyman Mohajerian <mohaj...@gmail.com>
wrote:

> Microbatching is certainly not a waste of time, you are making way too
> strong of an statement. In fact in certain cases one tuple at the time
> makes no sense, it all depends on the use cases. In fact if you understand
> the history of the project Storm you would know that microbatching was
> added later in Storm, Trident, and it is specifically for
> microbatching/windowing.
> In certain cases you are doing aggregation/windowing and throughput is the
> dominant design consideration and you don't care what each individual
> event/tuple does, e.g. of you push different event types to separate kafka
> topics and all you care is to do a count, what is the need for single event
> processing.
>
> On Sun, Apr 17, 2016 at 12:43 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> i have not been intrigued at all by the microbatching concept in Spark. I
>> am used to CEP in real streams processing environments like Infosphere
>> Streams & Storm where the granularity of processing is at the level of each
>> individual tuple and processing units (workers) can react immediately to
>> events being received and processed. The closest Spark streaming comes to
>> this concept is the notion of "state" that that can be updated via the
>> "updateStateBykey()" functions which are only able to be run in a
>> microbatch. Looking at the expected design changes to Spark Streaming in
>> Spark 2.0.0, it also does not look like tuple-at-a-time processing is on
>> the radar for Spark, though I have seen articles stating that more effort
>> is going to go into the Spark SQL layer in Spark streaming which may make
>> it more reminiscent of Esper.
>>
>> For these reasons, I have not even tried to implement CEP in Spark. I
>> feel it's a waste of time without immediate tuple-at-a-time processing.
>> Without this, they avoid the whole problem of "back pressure" (though keep
>> in mind, it is still very possible to overload the Spark streaming layer
>> with stages that will continue to pile up and never get worked off) but
>> they lose the granular control that you get in CEP environments by allowing
>> the rules & processors to react with the receipt of each tuple, right away.
>>
>> Awhile back, I did attempt to implement an InfoSphere Streams-like API
>> [1] on top of Apache Storm as an example of what such a design may look
>> like. It looks like Storm is going to be replaced in the not so distant
>> future by Twitter's new design called Heron. IIRC, Heron does not have an
>> open source implementation as of yet.
>>
>> [1] https://github.com/calrissian/flowmix
>>
>> On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Corey,
>>>
>>> Can you please point me to docs on using Spark for CEP? Do we have a set
>>> of CEP libraries somewhere. I am keen on getting hold of adaptor libraries
>>> for Spark something like below
>>>
>>>
>>>
>>> ​
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>&g

Re: Apache Flink

2016-04-17 Thread Corey Nolet
i have not been intrigued at all by the microbatching concept in Spark. I
am used to CEP in real streams processing environments like Infosphere
Streams & Storm where the granularity of processing is at the level of each
individual tuple and processing units (workers) can react immediately to
events being received and processed. The closest Spark streaming comes to
this concept is the notion of "state" that that can be updated via the
"updateStateBykey()" functions which are only able to be run in a
microbatch. Looking at the expected design changes to Spark Streaming in
Spark 2.0.0, it also does not look like tuple-at-a-time processing is on
the radar for Spark, though I have seen articles stating that more effort
is going to go into the Spark SQL layer in Spark streaming which may make
it more reminiscent of Esper.

For these reasons, I have not even tried to implement CEP in Spark. I feel
it's a waste of time without immediate tuple-at-a-time processing. Without
this, they avoid the whole problem of "back pressure" (though keep in mind,
it is still very possible to overload the Spark streaming layer with stages
that will continue to pile up and never get worked off) but they lose the
granular control that you get in CEP environments by allowing the rules &
processors to react with the receipt of each tuple, right away.

Awhile back, I did attempt to implement an InfoSphere Streams-like API [1]
on top of Apache Storm as an example of what such a design may look like.
It looks like Storm is going to be replaced in the not so distant future by
Twitter's new design called Heron. IIRC, Heron does not have an open source
implementation as of yet.

[1] https://github.com/calrissian/flowmix

On Sun, Apr 17, 2016 at 3:11 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi Corey,
>
> Can you please point me to docs on using Spark for CEP? Do we have a set
> of CEP libraries somewhere. I am keen on getting hold of adaptor libraries
> for Spark something like below
>
>
>
> ​
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 April 2016 at 16:07, Corey Nolet <cjno...@gmail.com> wrote:
>
>> One thing I've noticed about Flink in my following of the project has
>> been that it has established, in a few cases, some novel ideas and
>> improvements over Spark. The problem with it, however, is that both the
>> development team and the community around it are very small and many of
>> those novel improvements have been rolled directly into Spark in subsequent
>> versions. I was considering changing over my architecture to Flink at one
>> point to get better, more real-time CEP streaming support, but in the end I
>> decided to stick with Spark and just watch Flink continue to pressure it
>> into improvement.
>>
>> On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>>
>>> i never found much info that flink was actually designed to be fault
>>> tolerant. if fault tolerance is more bolt-on/add-on/afterthought then that
>>> doesn't bode well for large scale data processing. spark was designed with
>>> fault tolerance in mind from the beginning.
>>>
>>> On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I read the benchmark published by Yahoo. Obviously they already use
>>>> Storm and inevitably very familiar with that tool. To start with although
>>>> these benchmarks were somehow interesting IMO, it lend itself to an
>>>> assurance that the tool chosen for their platform is still the best choice.
>>>> So inevitably the benchmarks and the tests were done to support
>>>> primary their approach.
>>>>
>>>> In general anything which is not done through TCP Council or similar
>>>> body is questionable..
>>>> Their argument is that because Spark handles data streaming in micro
>>>> batches then inevitably it introduces this in-built latency as per design.
>>>> In contrast, both Storm and Flink do not (at the face value) have this
>>>> issue.
>>>>
>>>> In addition as we already know Spark has far more capabilities compared
>>>> to Flink (know nothing about Storm). So really it boils down to the
>>>> business SLA to choose which tool one wants to deploy for your use case.
>>>> IMO Sp

Re: Apache Flink

2016-04-17 Thread Corey Nolet
One thing I've noticed about Flink in my following of the project has been
that it has established, in a few cases, some novel ideas and improvements
over Spark. The problem with it, however, is that both the development team
and the community around it are very small and many of those novel
improvements have been rolled directly into Spark in subsequent versions. I
was considering changing over my architecture to Flink at one point to get
better, more real-time CEP streaming support, but in the end I decided to
stick with Spark and just watch Flink continue to pressure it into
improvement.

On Sun, Apr 17, 2016 at 11:03 AM, Koert Kuipers  wrote:

> i never found much info that flink was actually designed to be fault
> tolerant. if fault tolerance is more bolt-on/add-on/afterthought then that
> doesn't bode well for large scale data processing. spark was designed with
> fault tolerance in mind from the beginning.
>
> On Sun, Apr 17, 2016 at 9:52 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> I read the benchmark published by Yahoo. Obviously they already use Storm
>> and inevitably very familiar with that tool. To start with although these
>> benchmarks were somehow interesting IMO, it lend itself to an assurance
>> that the tool chosen for their platform is still the best choice. So
>> inevitably the benchmarks and the tests were done to support primary their
>> approach.
>>
>> In general anything which is not done through TCP Council or similar body
>> is questionable..
>> Their argument is that because Spark handles data streaming in micro
>> batches then inevitably it introduces this in-built latency as per design.
>> In contrast, both Storm and Flink do not (at the face value) have this
>> issue.
>>
>> In addition as we already know Spark has far more capabilities compared
>> to Flink (know nothing about Storm). So really it boils down to the
>> business SLA to choose which tool one wants to deploy for your use case.
>> IMO Spark micro batching approach is probably OK for 99% of use cases. If
>> we had in built libraries for CEP for Spark (I am searching for it), I
>> would not bother with Flink.
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 April 2016 at 12:47, Ovidiu-Cristian MARCU <
>> ovidiu-cristian.ma...@inria.fr> wrote:
>>
>>> You probably read this benchmark at Yahoo, any comments from Spark?
>>>
>>> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>>>
>>>
>>> On 17 Apr 2016, at 12:41, andy petrella  wrote:
>>>
>>> Just adding one thing to the mix: `that the latency for streaming data
>>> is eliminated` is insane :-D
>>>
>>> On Sun, Apr 17, 2016 at 12:19 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
  It seems that Flink argues that the latency for streaming data is
 eliminated whereas with Spark RDD there is this latency.

 I noticed that Flink does not support interactive shell much like Spark
 shell where you can add jars to it to do kafka testing. The advice was to
 add the streaming Kafka jar file to CLASSPATH but that does not work.

 Most Flink documentation also rather sparce with the usual example of
 word count which is not exactly what you want.

 Anyway I will have a look at it further. I have a Spark Scala streaming
 Kafka program that works fine in Spark and I want to recode it using Scala
 for Flink with Kafka but have difficulty importing and testing libraries.

 Cheers

 Dr Mich Talebzadeh


 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *


 http://talebzadehmich.wordpress.com



 On 17 April 2016 at 02:41, Ascot Moss  wrote:

> I compared both last month, seems to me that Flink's MLLib is not yet
> ready.
>
> On Sun, Apr 17, 2016 at 12:23 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks Ted. I was wondering if someone is using both :)
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 16 April 2016 at 17:08, Ted Yu  wrote:
>>
>>> Looks like this question is more relevant on flink mailing list :-)
>>>
>>> On Sat, Apr 16, 2016 at 8:52 AM, Mich Talebzadeh <
>>> 

Re: Shuffle guarantees

2016-03-01 Thread Corey Nolet
Nevermind, a look @ the ExternalSorter class tells me that the iterator for
each key that's only partially ordered ends up being merge sorted by
equality after the fact. Wanted to post my finding on here for others who
may have the same questions.


On Tue, Mar 1, 2016 at 3:05 PM, Corey Nolet <cjno...@gmail.com> wrote:

> The reason I'm asking, I see a comment in the ExternalSorter class that
> says this:
>
> "If we need to aggregate by key, we either use a total ordering from the
> ordering parameter, or read the keys with the same hash code and compare
> them with each other for equality to merge values".
>
> How can this be assumed if the object used for the key, for instance, in
> the case where a HashPartitioner is used, cannot assume ordering and
> therefore cannot assume a comparator can be used?
>
> On Tue, Mar 1, 2016 at 2:56 PM, Corey Nolet <cjno...@gmail.com> wrote:
>
>> So if I'm using reduceByKey() with a HashPartitioner, I understand that
>> the hashCode() of my key is used to create the underlying shuffle files.
>>
>> Is anything other than hashCode() used in the shuffle files when the data
>> is pulled into the reducers and run through the reduce function? The reason
>> I'm asking is because there's a possibility of hashCode() colliding in two
>> different objects which end up hashing to the same number, right?
>>
>>
>>
>


Re: Shuffle guarantees

2016-03-01 Thread Corey Nolet
The reason I'm asking, I see a comment in the ExternalSorter class that
says this:

"If we need to aggregate by key, we either use a total ordering from the
ordering parameter, or read the keys with the same hash code and compare
them with each other for equality to merge values".

How can this be assumed if the object used for the key, for instance, in
the case where a HashPartitioner is used, cannot assume ordering and
therefore cannot assume a comparator can be used?

On Tue, Mar 1, 2016 at 2:56 PM, Corey Nolet <cjno...@gmail.com> wrote:

> So if I'm using reduceByKey() with a HashPartitioner, I understand that
> the hashCode() of my key is used to create the underlying shuffle files.
>
> Is anything other than hashCode() used in the shuffle files when the data
> is pulled into the reducers and run through the reduce function? The reason
> I'm asking is because there's a possibility of hashCode() colliding in two
> different objects which end up hashing to the same number, right?
>
>
>


Shuffle guarantees

2016-03-01 Thread Corey Nolet
So if I'm using reduceByKey() with a HashPartitioner, I understand that the
hashCode() of my key is used to create the underlying shuffle files.

Is anything other than hashCode() used in the shuffle files when the data
is pulled into the reducers and run through the reduce function? The reason
I'm asking is because there's a possibility of hashCode() colliding in two
different objects which end up hashing to the same number, right?


Re: Shuffle memory woes

2016-02-08 Thread Corey Nolet
I sure do! [1] And yes- I'm really hoping they will chime in, otherwise I
may dig a little deeper myself and start posting some jira tickets.

[1] http://www.slideshare.net/cjnolet

On Mon, Feb 8, 2016 at 3:02 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> It's interesting to see what spark dev people will say.
> Corey do you have presentation available online?
>
> On 8 February 2016 at 05:16, Corey Nolet <cjno...@gmail.com> wrote:
>
>> Charles,
>>
>> Thank you for chiming in and I'm glad someone else is experiencing this
>> too and not just me. I know very well how the Spark shuffles work and I've
>> done deep dive presentations @ Spark meetups in the past. This problem is
>> somethng that goes beyond that and, I believe, it exposes a fundamental
>> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
>> it can be fixed.
>>
>> Also- in regards to how much data actually gets shuffled- believe it or
>> not this problem can take a 30-40 minute job and make it run for 4 or more
>> hours. If  let the job run for 4+ hours the amount of data being shuffled
>> for this particular dataset will be 100 or more TB. Usually, however, I end
>> up killing the job long before that point because I realize it should not
>> be taking this long. The particular dataset we're doing is not for
>> real-time exploration. These are very large joins we're doing for jobs that
>> we run a few times a day.
>>
>> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xpnc54byp...@gmail.com>
>> wrote:
>>
>>>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>>>
>>> -- I have had the same experiences, although not to this extreme (the
>>> spills were < 10T while the input was ~ 100s gb) and haven't found any
>>> solution yet. I don't believe this is related to input data format. in my
>>> case, I got my input data by loading from Hive tables.
>>>
>>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:
>>>
>>>> Hi,Corey:
>>>>    "The dataset is 100gb at most, the spills can up to 10T-100T", Are
>>>> your input files lzo format, and you use sc.text() ? If memory is not
>>>> enough, spark will spill 3-4x of input data to disk.
>>>>
>>>>
>>>> -- 原始邮件 --
>>>> *发件人:* "Corey Nolet";<cjno...@gmail.com>;
>>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>>>> *收件人:* "Igor Berman"<igor.ber...@gmail.com>;
>>>> *抄送:* "user"<user@spark.apache.org>;
>>>> *主题:* Re: Shuffle memory woes
>>>>
>>>> As for the second part of your questions- we have a fairly complex join
>>>> process which requires a ton of stage orchestration from our driver. I've
>>>> written some code to be able to walk down our DAG tree and execute siblings
>>>> in the tree concurrently where possible (forcing cache to disk on children
>>>> that that have multiple chiildren themselves so that they can be run
>>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>>>> keeping tasks as busy as possible processing concurrent stages. Funny
>>>> enough though, the stage that is causing problems with shuffling for us has
>>>> a lot of children and doesn't even run concurrently with any other stages
>>>> so I ruled out the concurrency of the stages as a culprit for the
>>>> shuffliing problem we're seeing.
>>>>
>>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> Igor,
>>>>>
>>>>> I don't think the question is "why can't it fit stuff in memory". I
>>>>> know why it can't fit stuff in memory- because it's a large dataset that
>>>>> needs to have a reduceByKey() run on it. My understanding is that when it
>>>>> doesn't fit into memory it needs to spill in order to consolidate
>>>>> intermediary files into a single file. The more data you need to run
>>>>> through this, the more it will need to spill. My findings is that once it
>>>>> gets stuck in this spill chain with our dataset it's all over @ that point
>>>>> because it will spill and spill and spill and spill and spill. If I give
>>>>> the shuffle enough memory it won't- irrespective of the number of
>>>>> partitions we have (i've done everything from repartition(500) to
>>>>> repartition(250

Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
As for the second part of your questions- we have a fairly complex join
process which requires a ton of stage orchestration from our driver. I've
written some code to be able to walk down our DAG tree and execute siblings
in the tree concurrently where possible (forcing cache to disk on children
that that have multiple chiildren themselves so that they can be run
concurrently). Ultimatey, we have seen significant speedup in our jobs by
keeping tasks as busy as possible processing concurrent stages. Funny
enough though, the stage that is causing problems with shuffling for us has
a lot of children and doesn't even run concurrently with any other stages
so I ruled out the concurrency of the stages as a culprit for the
shuffliing problem we're seeing.

On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:

> Igor,
>
> I don't think the question is "why can't it fit stuff in memory". I know
> why it can't fit stuff in memory- because it's a large dataset that needs
> to have a reduceByKey() run on it. My understanding is that when it doesn't
> fit into memory it needs to spill in order to consolidate intermediary
> files into a single file. The more data you need to run through this, the
> more it will need to spill. My findings is that once it gets stuck in this
> spill chain with our dataset it's all over @ that point because it will
> spill and spill and spill and spill and spill. If I give the shuffle enough
> memory it won't- irrespective of the number of partitions we have (i've
> done everything from repartition(500) to repartition(2500)). It's not a
> matter of running out of memory on a single node because the data is
> skewed. It's more a matter of the shuffle buffer filling up and needing to
> spill. I think what may be happening is that it gets to a point where it's
> spending more time reading/writing from disk while doing the spills then it
> is actually processing any data. I can tell this because I can see that the
> spills sometimes get up into the 10's to 100's of TB where the input data
> was maybe 100gb at most. Unfortunately my code is on a private internal
> network and I'm not able to share it.
>
> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> wrote:
>
>> so can you provide code snippets: especially it's interesting to see what
>> are your transformation chain, how many partitions are there on each side
>> of shuffle operation
>>
>> the question is why it can't fit stuff in memory when you are shuffling -
>> maybe your partitioner on "reduce" side is not configured properly? I mean
>> if map side is ok, and you just reducing by key or something it should be
>> ok, so some detail is missing...skewed data? aggregate by key?
>>
>> On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Igor,
>>>
>>> Thank you for the response but unfortunately, the problem I'm referring
>>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>>> map side but didn't do much for the spilling when there was no longer any
>>> memory left for the shuffle. Also the new auto-memory management doesn't
>>> seem like it'll have too much of an effect after i've already given most
>>> the memory i've allocated to the shuffle. The problem I'm having is most
>>> specifically related to the shuffle performing declining by several orders
>>> of magnitude when it needs to spill multiple times (it ends up spilling
>>> several hundred for me when it can't fit stuff into memory).
>>>
>>>
>>>
>>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> usually you can solve this by 2 steps
>>>> make rdd to have more partitions
>>>> play with shuffle memory fraction
>>>>
>>>> in spark 1.6 cache vs shuffle memory fractions are adjusted
>>>> automatically
>>>>
>>>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>>>>
>>>>> I just recently had a discovery that my jobs were taking several hours
>>>>> to completely because of excess shuffle spills. What I found was that when
>>>>> I hit the high point where I didn't have enough memory for the shuffles to
>>>>> store all of their file consolidations at once, it could spill so many
>>>>> times that it causes my job's runtime to increase by orders of magnitude
>>>>> (and sometimes fail altogether).
>>>>>
>>

Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
Igor,

I don't think the question is "why can't it fit stuff in memory". I know
why it can't fit stuff in memory- because it's a large dataset that needs
to have a reduceByKey() run on it. My understanding is that when it doesn't
fit into memory it needs to spill in order to consolidate intermediary
files into a single file. The more data you need to run through this, the
more it will need to spill. My findings is that once it gets stuck in this
spill chain with our dataset it's all over @ that point because it will
spill and spill and spill and spill and spill. If I give the shuffle enough
memory it won't- irrespective of the number of partitions we have (i've
done everything from repartition(500) to repartition(2500)). It's not a
matter of running out of memory on a single node because the data is
skewed. It's more a matter of the shuffle buffer filling up and needing to
spill. I think what may be happening is that it gets to a point where it's
spending more time reading/writing from disk while doing the spills then it
is actually processing any data. I can tell this because I can see that the
spills sometimes get up into the 10's to 100's of TB where the input data
was maybe 100gb at most. Unfortunately my code is on a private internal
network and I'm not able to share it.

On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> so can you provide code snippets: especially it's interesting to see what
> are your transformation chain, how many partitions are there on each side
> of shuffle operation
>
> the question is why it can't fit stuff in memory when you are shuffling -
> maybe your partitioner on "reduce" side is not configured properly? I mean
> if map side is ok, and you just reducing by key or something it should be
> ok, so some detail is missing...skewed data? aggregate by key?
>
> On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:
>
>> Igor,
>>
>> Thank you for the response but unfortunately, the problem I'm referring
>> to goes beyond this. I have set the shuffle memory fraction to be 90% and
>> set the cache memory to be 0. Repartitioning the RDD helped a tad on the
>> map side but didn't do much for the spilling when there was no longer any
>> memory left for the shuffle. Also the new auto-memory management doesn't
>> seem like it'll have too much of an effect after i've already given most
>> the memory i've allocated to the shuffle. The problem I'm having is most
>> specifically related to the shuffle performing declining by several orders
>> of magnitude when it needs to spill multiple times (it ends up spilling
>> several hundred for me when it can't fit stuff into memory).
>>
>>
>>
>> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> usually you can solve this by 2 steps
>>> make rdd to have more partitions
>>> play with shuffle memory fraction
>>>
>>> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>>>
>>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>>>
>>>> I just recently had a discovery that my jobs were taking several hours
>>>> to completely because of excess shuffle spills. What I found was that when
>>>> I hit the high point where I didn't have enough memory for the shuffles to
>>>> store all of their file consolidations at once, it could spill so many
>>>> times that it causes my job's runtime to increase by orders of magnitude
>>>> (and sometimes fail altogether).
>>>>
>>>> I've played with all the tuning parameters I can find. To speed the
>>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>>> shuffle buffering a tad (both up and down).
>>>>
>>>> I feel like I see a weak point here. The mappers are sharing memory
>>>> space with reducers and the shuffles need enough memory to consolidate and
>>>> pull otherwise they will need to spill and spill and spill. What i've
>>>> noticed about my jobs is that this is a difference between them taking 30
>>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>>
>>>> I've found that, as a result of the spilling, I'm better off not
>>>> caching any data in memory and lowering my storage fraction to 0 and still
>>>> hoping I was able to give my shuffles enough memory that my data doesn't
>>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>>> because it seems like it forces the memory limits on my job- otherwise it
>>>> could take orders of magnitude longer to execute.
>>>>
>>>>
>>>
>>
>


Re: Shuffle memory woes

2016-02-07 Thread Corey Nolet
Charles,

Thank you for chiming in and I'm glad someone else is experiencing this too
and not just me. I know very well how the Spark shuffles work and I've done
deep dive presentations @ Spark meetups in the past. This problem is
somethng that goes beyond that and, I believe, it exposes a fundamental
paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
it can be fixed.

Also- in regards to how much data actually gets shuffled- believe it or not
this problem can take a 30-40 minute job and make it run for 4 or more
hours. If  let the job run for 4+ hours the amount of data being shuffled
for this particular dataset will be 100 or more TB. Usually, however, I end
up killing the job long before that point because I realize it should not
be taking this long. The particular dataset we're doing is not for
real-time exploration. These are very large joins we're doing for jobs that
we run a few times a day.

On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xpnc54byp...@gmail.com> wrote:

>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>
> -- I have had the same experiences, although not to this extreme (the
> spills were < 10T while the input was ~ 100s gb) and haven't found any
> solution yet. I don't believe this is related to input data format. in my
> case, I got my input data by loading from Hive tables.
>
> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:
>
>> Hi,Corey:
>>"The dataset is 100gb at most, the spills can up to 10T-100T", Are
>> your input files lzo format, and you use sc.text() ? If memory is not
>> enough, spark will spill 3-4x of input data to disk.
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Corey Nolet";<cjno...@gmail.com>;
>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>> *收件人:* "Igor Berman"<igor.ber...@gmail.com>;
>> *抄送:* "user"<user@spark.apache.org>;
>> *主题:* Re: Shuffle memory woes
>>
>> As for the second part of your questions- we have a fairly complex join
>> process which requires a ton of stage orchestration from our driver. I've
>> written some code to be able to walk down our DAG tree and execute siblings
>> in the tree concurrently where possible (forcing cache to disk on children
>> that that have multiple chiildren themselves so that they can be run
>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>> keeping tasks as busy as possible processing concurrent stages. Funny
>> enough though, the stage that is causing problems with shuffling for us has
>> a lot of children and doesn't even run concurrently with any other stages
>> so I ruled out the concurrency of the stages as a culprit for the
>> shuffliing problem we're seeing.
>>
>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> Igor,
>>>
>>> I don't think the question is "why can't it fit stuff in memory". I know
>>> why it can't fit stuff in memory- because it's a large dataset that needs
>>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>>> fit into memory it needs to spill in order to consolidate intermediary
>>> files into a single file. The more data you need to run through this, the
>>> more it will need to spill. My findings is that once it gets stuck in this
>>> spill chain with our dataset it's all over @ that point because it will
>>> spill and spill and spill and spill and spill. If I give the shuffle enough
>>> memory it won't- irrespective of the number of partitions we have (i've
>>> done everything from repartition(500) to repartition(2500)). It's not a
>>> matter of running out of memory on a single node because the data is
>>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>>> spill. I think what may be happening is that it gets to a point where it's
>>> spending more time reading/writing from disk while doing the spills then it
>>> is actually processing any data. I can tell this because I can see that the
>>> spills sometimes get up into the 10's to 100's of TB where the input data
>>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>>> private internal network and I'm not able to share it.
>>>
>>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> so can you provide code snippets: especially it's interesting to see
>>>> what are your transformation chain, how many partitions are there on each
>>>> side of shuffle operation
>>&

Re: Help needed in deleting a message posted in Spark User List

2016-02-06 Thread Corey Nolet
The whole purpose of Apache mailing lists is that the messages get indexed
all over the web so that discussions and questions/solutions can be
searched easily by google and other engines.

For this reason, and the messages being sent via email as Steve pointed
out, it's just not possible to retract the messages.

On Sat, Feb 6, 2016 at 10:21 AM, Steve Loughran 
wrote:

>
> > On 5 Feb 2016, at 17:35, Marcelo Vanzin  wrote:
> >
> > You don't... just send a new one.
> >
> > On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
> >  wrote:
> >> Hi,
> >>
> >> I want to edit/delete a message posted in Spark User List. How do I do
> that?
> >>
> >> Thanks!
> >
> >
> >
>
> it isn't technically possible
>
> http://apache.org/foundation/public-archives.html
>
> People do occasionally ask on the infrastructure mailing list to do do
> this, but they aren't in a position to do anything about the copies that
> end up in the mailboxes of every subscriber.
>
> Don't worry about it; we've all done things like post internal stack
> traces, accidentally mail the wrong list, etc, etc.
>
> Now, accidentally breaking the nightly build of everything, that's
> somewhat embarrassing —but you haven't done that and it's been ~4 months
> since I've done that myself.
>
>
> -Steve


Re: Shuffle memory woes

2016-02-06 Thread Corey Nolet
Igor,

Thank you for the response but unfortunately, the problem I'm referring to
goes beyond this. I have set the shuffle memory fraction to be 90% and set
the cache memory to be 0. Repartitioning the RDD helped a tad on the map
side but didn't do much for the spilling when there was no longer any
memory left for the shuffle. Also the new auto-memory management doesn't
seem like it'll have too much of an effect after i've already given most
the memory i've allocated to the shuffle. The problem I'm having is most
specifically related to the shuffle performing declining by several orders
of magnitude when it needs to spill multiple times (it ends up spilling
several hundred for me when it can't fit stuff into memory).



On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com> wrote:

> Hi,
> usually you can solve this by 2 steps
> make rdd to have more partitions
> play with shuffle memory fraction
>
> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>
> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>
>> I just recently had a discovery that my jobs were taking several hours to
>> completely because of excess shuffle spills. What I found was that when I
>> hit the high point where I didn't have enough memory for the shuffles to
>> store all of their file consolidations at once, it could spill so many
>> times that it causes my job's runtime to increase by orders of magnitude
>> (and sometimes fail altogether).
>>
>> I've played with all the tuning parameters I can find. To speed the
>> shuffles up, I tuned the akka threads to different values. I also tuned the
>> shuffle buffering a tad (both up and down).
>>
>> I feel like I see a weak point here. The mappers are sharing memory space
>> with reducers and the shuffles need enough memory to consolidate and pull
>> otherwise they will need to spill and spill and spill. What i've noticed
>> about my jobs is that this is a difference between them taking 30 minutes
>> and 4 hours or more. Same job- just different memory tuning.
>>
>> I've found that, as a result of the spilling, I'm better off not caching
>> any data in memory and lowering my storage fraction to 0 and still hoping I
>> was able to give my shuffles enough memory that my data doesn't
>> continuously spill. Is this the way it's supposed to be? It makes it hard
>> because it seems like it forces the memory limits on my job- otherwise it
>> could take orders of magnitude longer to execute.
>>
>>
>


Shuffle memory woes

2016-02-05 Thread Corey Nolet
I just recently had a discovery that my jobs were taking several hours to
completely because of excess shuffle spills. What I found was that when I
hit the high point where I didn't have enough memory for the shuffles to
store all of their file consolidations at once, it could spill so many
times that it causes my job's runtime to increase by orders of magnitude
(and sometimes fail altogether).

I've played with all the tuning parameters I can find. To speed the
shuffles up, I tuned the akka threads to different values. I also tuned the
shuffle buffering a tad (both up and down).

I feel like I see a weak point here. The mappers are sharing memory space
with reducers and the shuffles need enough memory to consolidate and pull
otherwise they will need to spill and spill and spill. What i've noticed
about my jobs is that this is a difference between them taking 30 minutes
and 4 hours or more. Same job- just different memory tuning.

I've found that, as a result of the spilling, I'm better off not caching
any data in memory and lowering my storage fraction to 0 and still hoping I
was able to give my shuffles enough memory that my data doesn't
continuously spill. Is this the way it's supposed to be? It makes it hard
because it seems like it forces the memory limits on my job- otherwise it
could take orders of magnitude longer to execute.


Re: ROSE: Spark + R on the JVM.

2016-01-12 Thread Corey Nolet
David,

Thank you very much for announcing this! It looks like it could be very
useful. Would you mind providing a link to the github?

On Tue, Jan 12, 2016 at 10:03 AM, David 
wrote:

> Hi all,
>
> I'd like to share news of the recent release of a new Spark package, ROSE.
>
>
> ROSE is a Scala library offering access to the full scientific computing
> power of the R programming language to Apache Spark batch and streaming
> applications on the JVM. Where Apache SparkR lets data scientists use Spark
> from R, ROSE is designed to let Scala and Java developers use R from Spark.
>
> The project is available and documented on GitHub and I would encourage
> you to take a look. Any feedback, questions etc very welcome.
>
> David
>
> "All that is gold does not glitter, Not all those who wander are lost."
>


Re: MongoDB and Spark

2015-09-11 Thread Corey Nolet
Unfortunately, MongoDB does not directly expose its locality via its client
API so the problem with trying to schedule Spark tasks against it is that
the tasks themselves cannot be scheduled locally on nodes containing query
results- which means you can only assume most results will be sent over the
network to the task that needs to process it. This is bad. The other reason
(which is also related to the issue of locality) is that I'm not sure if
there's an easy way to spread the results of a query over multiple
different clients- thus you'd probably have to start your Spark RDD with a
single partition and then repartition. What you've done at that point is
you've taken data from multiple mongodb nodes and you've collected them on
a single node just to re-partition them, again across the network, onto
multiple nodes. This is also bad.

I think this is the reason it was recommended to use MongoDB's mapreduce
because they can use their locality information internally. I had this same
issue w/ Couchbase a couple years back- it's unfortunate but it's the
reality.




On Fri, Sep 11, 2015 at 9:34 AM, Sandeep Giri 
wrote:

> I think it should be possible by loading collections as RDD and then doing
> a union on them.
>
> Regards,
> Sandeep Giri,
> +1 347 781 4573 (US)
> +91-953-899-8962 (IN)
>
> www.KnowBigData.com. 
> Phone: +1-253-397-1945 (Office)
>
> [image: linkedin icon]  [image:
> other site icon]   [image: facebook icon]
>  [image: twitter icon]
>  
>
>
> On Fri, Sep 11, 2015 at 3:40 PM, Mishra, Abhishek <
> abhishek.mis...@xerox.com> wrote:
>
>> Anything using Spark RDD’s ???
>>
>>
>>
>> Abhishek
>>
>>
>>
>> *From:* Sandeep Giri [mailto:sand...@knowbigdata.com]
>> *Sent:* Friday, September 11, 2015 3:19 PM
>> *To:* Mishra, Abhishek; user@spark.apache.org; d...@spark.apache.org
>> *Subject:* Re: MongoDB and Spark
>>
>>
>>
>> use map-reduce.
>>
>>
>>
>> On Fri, Sep 11, 2015, 14:32 Mishra, Abhishek 
>> wrote:
>>
>> Hello ,
>>
>>
>>
>> Is there any way to query multiple collections from mongodb using spark
>> and java.  And i want to create only one Configuration Object. Please help
>> if anyone has something regarding this.
>>
>>
>>
>>
>>
>> Thank You
>>
>> Abhishek
>>
>>
>


Re: What is the reason for ExecutorLostFailure?

2015-08-18 Thread Corey Nolet
Usually more information as to the cause of this will be found down in your
logs. I generally see this happen when an out of memory exception has
occurred for one reason or another on an executor. It's possible your
memory settings are too small per executor or the concurrent number of
tasks you are running are too large for some of the executors. Other times,
it's possible using RDD functions like groupBy() that collect an unbounded
amount of items into memory could be causing it.

Either way, the logs for the executors should be able to give you some
insight, have you looked at those yet?

On Tue, Aug 18, 2015 at 6:26 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io
 wrote:

 Hi All

 Why am I getting ExecutorLostFailure and executors are completely lost
 for rest of the processing? Eventually it makes job to fail. One thing for
 sure that lot of shuffling happens across executors in my program.

 Is there a way to understand and debug ExecutorLostFailure? Any pointers
 regarding “ExecutorLostFailure” would help me a lot.

 Thanks
 Vijay



Re: Newbie question: what makes Spark run faster than MapReduce

2015-08-07 Thread Corey Nolet
1) Spark only needs to shuffle when data needs to be partitioned around the
workers in an all-to-all fashion.
2) Multi-stage jobs that would normally require several map reduce jobs,
thus causing data to be dumped to disk between the jobs can be cached in
memory.


SparkConf ignoring keys

2015-08-05 Thread Corey Nolet
I've been using SparkConf on my project for quite some time now to store
configuration information for its various components. This has worked very
well thus far in situations where I have control over the creation of the
SparkContext  the SparkConf.

I have run into a bit of a problem trying to integrate this same approach
to the use of the shell, however. I have a bunch of properties in a
properties file that are shared across several different types of
applications (web containers, etc...) but the SparkConf ignores these
properties because they aren't prefixed with spark.*

Is this really necessary? It's not really stopping people from adding their
own properties and it limits the power of being able to utilize one central
configuration object.


Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode

2015-07-28 Thread Corey Nolet
On Tue, Jul 28, 2015 at 2:17 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Thanks Corey for your answer,

 Do you mean that final status : SUCCEEDED in terminal logs means that
 YARN RM could clean the resources after the application has finished
 (application finishing does not necessarily mean succeeded or failed) ?

 Correct.


 With that logic it totally makes sense.

 Basically the YARN logs does not say anything about the Spark job itself.
 It just says that Spark job resources have been cleaned up after the job
 completed and returned back to Yarn.


If you have log aggregation enabled of your cluster, the yarn log command
should give you any exceptions that were thrown in the driver / executors
when you are running in yarn cluster mode. If you were running in
yarn-client mode, you'd see the errors that caused a job to fail in your
local log (errors that would cause a job to fail will be caught by the
SparkContext on the driver) because the driver is running locally instead
of being deployed in a yarn container. Also, using the Spark HistoryServer
will give you a more visual insight into the exact problems (like which
partitions failed, which executors died trying to process them, etc...)



 It would be great if Yarn logs could also say about the consequence of the
 job, because the user is interested in more about the job final status.


This is just an artifact of running with yarn-cluster mode. It's still easy
enough to run the yarn log command to see all the logs (you can grep for
the node designated as the application master to find any exceptions in
your driver that may show you why your application failed).  The
HistoryServer would still give you enough information after the fact to see
the failures.

Generally, I submit my jobs in yarn-client mode while i'm testing so that I
can spot errors right away. I generally only use yarn-cluster mode for jobs
that are deployed onto operational hardware- that way if a job does fail, I
can still use yarn log to find out why, but I don't need a local process
running on the machine that submitted the job taking up resources (see the
waitForAppCompletion property introduced into Spark 1.4).

I'll also caveat my response and say that I have not used Spark's Python
API so I can only give you a general overview of how the Yarn integration
works from the Scala point of view.


Hope this helps.


 Yarn related logs can be found in RM ,NM, DN, NN log files in detail.

 Thanks again.

 On Mon, Jul 27, 2015 at 7:45 PM, Corey Nolet cjno...@gmail.com wrote:

 Elkhan,

 What does the ResourceManager say about the final status of the job?
 Spark jobs that run as Yarn applications can fail but still successfully
 clean up their resources and give them back to the Yarn cluster. Because of
 this, there's a difference between your code throwing an exception in an
 executor/driver and the Yarn application failing. Generally you'll see a
 yarn application fail when there's a memory problem (too much memory being
 allocated or not enough causing executors to fail multiple times not
 allowing your job to finish).

 What I'm seeing from your post is that you had an exception in your
 application which was caught by the Spark framework which then proceeded to
 clean up the job and shut itself down- which it did successfully. When you
 aren't running in the Yarn modes, you aren't seeing any Yarn status that's
 telling you the Yarn application was successfully shut down, you are just
 seeing the failure(s) from your drivers/executors.



 On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Any updates on this bug ?

 Why Spark log results  Job final status does not match ? (one saying
 that job has failed, another stating that job has succeeded)

 Thanks.


 On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 While running Spark Word count python example with intentional mistake
 in *Yarn cluster mode*, Spark terminal states final status as
 SUCCEEDED, but log files state correct results indicating that the job
 failed.

 Why terminal log output  application log output contradict each other ?

 If i run same job on *local mode* then terminal logs and application
 logs match, where both state that job has failed to expected error in
 python script.

 More details: Scenario

 While running Spark Word count python example on *Yarn cluster mode*,
 if I make intentional error in wordcount.py by changing this line (I'm
 using Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0
 versions - which i tested):

 lines = sc.textFile(sys.argv[1], 1)

 into this line:

 lines = sc.textFile(*nonExistentVariable*,1)

 where nonExistentVariable variable was never created and initialized.

 then i run that example with this command (I put README.md into HDFS
 before running this command):

 *./bin/spark-submit --master yarn-cluster wordcount.py /README.md*

 The job runs and finishes successfully according the log

Re: [ Potential bug ] Spark terminal logs say that job has succeeded even though job has failed in Yarn cluster mode

2015-07-27 Thread Corey Nolet
Elkhan,

What does the ResourceManager say about the final status of the job?  Spark
jobs that run as Yarn applications can fail but still successfully clean up
their resources and give them back to the Yarn cluster. Because of this,
there's a difference between your code throwing an exception in an
executor/driver and the Yarn application failing. Generally you'll see a
yarn application fail when there's a memory problem (too much memory being
allocated or not enough causing executors to fail multiple times not
allowing your job to finish).

What I'm seeing from your post is that you had an exception in your
application which was caught by the Spark framework which then proceeded to
clean up the job and shut itself down- which it did successfully. When you
aren't running in the Yarn modes, you aren't seeing any Yarn status that's
telling you the Yarn application was successfully shut down, you are just
seeing the failure(s) from your drivers/executors.



On Mon, Jul 27, 2015 at 2:11 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Any updates on this bug ?

 Why Spark log results  Job final status does not match ? (one saying that
 job has failed, another stating that job has succeeded)

 Thanks.


 On Thu, Jul 23, 2015 at 4:43 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 While running Spark Word count python example with intentional mistake in 
 *Yarn
 cluster mode*, Spark terminal states final status as SUCCEEDED, but log
 files state correct results indicating that the job failed.

 Why terminal log output  application log output contradict each other ?

 If i run same job on *local mode* then terminal logs and application
 logs match, where both state that job has failed to expected error in
 python script.

 More details: Scenario

 While running Spark Word count python example on *Yarn cluster mode*, if
 I make intentional error in wordcount.py by changing this line (I'm using
 Spark 1.4.1, but this problem exists in Spark 1.4.0 and in 1.3.0 versions -
 which i tested):

 lines = sc.textFile(sys.argv[1], 1)

 into this line:

 lines = sc.textFile(*nonExistentVariable*,1)

 where nonExistentVariable variable was never created and initialized.

 then i run that example with this command (I put README.md into HDFS
 before running this command):

 *./bin/spark-submit --master yarn-cluster wordcount.py /README.md*

 The job runs and finishes successfully according the log printed in the
 terminal :
 *Terminal logs*:
 ...
 15/07/23 16:19:17 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:18 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:19 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:20 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: RUNNING)
 15/07/23 16:19:21 INFO yarn.Client: Application report for
 application_1437612288327_0013 (state: FINISHED)
 15/07/23 16:19:21 INFO yarn.Client:
  client token: N/A
  diagnostics: Shutdown hook called before final status was reported.
  ApplicationMaster host: 10.0.53.59
  ApplicationMaster RPC port: 0
  queue: default
  start time: 1437693551439
  final status: *SUCCEEDED*
  tracking URL:
 http://localhost:8088/proxy/application_1437612288327_0013/history/application_1437612288327_0013/1
  user: edadashov
 15/07/23 16:19:21 INFO util.Utils: Shutdown hook called
 15/07/23 16:19:21 INFO util.Utils: Deleting directory
 /tmp/spark-eba0a1b5-a216-4afa-9c54-a3cb67b16444

 But if look at log files generated for this application in HDFS - it
 indicates failure of the job with correct reason:
 *Application log files*:
 ...
 \00 stdout\00 179Traceback (most recent call last):
   File wordcount.py, line 32, in module
 lines = sc.textFile(nonExistentVariable,1)
 *NameError: name 'nonExistentVariable' is not defined*


 Why terminal output - final status: *SUCCEEDED , *is not matching
 application log results - failure of the job (NameError: name
 'nonExistentVariable' is not defined) ?

 Is this bug ? Is there Jira ticket related to this issue ? (Is someone
 assigned to this issue ?)

 If i run this wordcount .py example (with mistake line) in local mode,
 then terminal log states that the job has failed in terminal logs too.

 *./bin/spark-submit wordcount.py /README.md*

 *Terminal logs*:

 ...
 15/07/23 16:31:55 INFO scheduler.EventLoggingListener: Logging events to
 hdfs:///app-logs/local-1437694314943
 Traceback (most recent call last):
   File /home/edadashov/tools/myspark/spark/wordcount.py, line 32, in
 module
 lines = sc.textFile(nonExistentVariable,1)
 NameError: name 'nonExistentVariable' is not defined
 15/07/23 16:31:55 INFO spark.SparkContext: Invoking stop() from shutdown
 hook


 Thanks.




 --

 Best regards,
 Elkhan Dadashov



MapType vs StructType

2015-07-17 Thread Corey Nolet
I notice JSON objects are all parsed as Map[String,Any] in Jackson but for
some reason, the inferSchema tools in Spark SQL extracts the schema of
nested JSON objects as StructTypes.

This makes it really confusing when trying to rectify the object hierarchy
when I have maps because the Catalyst conversion layer underneath is
expecting a Row or Product and not a Map.

Why wasn't MapType used here? Is there any significant difference between
the two of these types that would cause me not to use a MapType when I'm
constructing my own schema representing a set of nested Map[String,_]'s?


Re: MapType vs StructType

2015-07-17 Thread Corey Nolet
This helps immensely. Thanks Michael!

On Fri, Jul 17, 2015 at 4:33 PM, Michael Armbrust mich...@databricks.com
wrote:

 I'll add there is a JIRA to override the default past some threshold of #
 of unique keys: https://issues.apache.org/jira/browse/SPARK-4476
 https://issues.apache.org/jira/browse/SPARK-4476

 On Fri, Jul 17, 2015 at 1:32 PM, Michael Armbrust mich...@databricks.com
 wrote:

 The difference between a map and a struct here is that in a struct all
 possible keys are defined as part of the schema and can each can have a
 different type (and we don't support union types).  JSON doesn't have
 differentiated data structures so we go with the one that gives you more
 information when doing inference by default.  If you pass in a schema to
 JSON however, you can override this and have a JSON object parsed as a map.

 On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote:

 I notice JSON objects are all parsed as Map[String,Any] in Jackson but
 for some reason, the inferSchema tools in Spark SQL extracts the schema
 of nested JSON objects as StructTypes.

 This makes it really confusing when trying to rectify the object
 hierarchy when I have maps because the Catalyst conversion layer underneath
 is expecting a Row or Product and not a Map.

 Why wasn't MapType used here? Is there any significant difference
 between the two of these types that would cause me not to use a MapType
 when I'm constructing my own schema representing a set of nested
 Map[String,_]'s?








Re: map vs mapPartitions

2015-06-25 Thread Corey Nolet
Also,

I've noticed that .map() actually creates a MapPartitionsRDD under the
hood. SO I think the real difference is just in the API that's being
exposed. You can do a map() and not have to think about the partitions at
all or you can do a .mapPartitions() and be able to do things like chunking
of the data in the partition (fetching more than 1 record @ a time).

On Thu, Jun 25, 2015 at 12:19 PM, Corey Nolet cjno...@gmail.com wrote:

 I don't know exactly what's going on under the hood but I would not assume
 that just because a whole partition is not being pulled into memory @ one
 time that that means each record is being pulled at 1 time. That's the
 beauty of exposing Iterators  Iterables in an API rather than collections-
 there's a bunch of buffering that can be hidden from the user to make the
 iterations as efficient as they can be.

 On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 yes, 1 partition per core and  mapPartitions apply function on each
 partition.

 Question is Does complete partition loads in memory so that function can
 be applied to it or its an iterator and iterator.next() loads next record
 and if yes then how is it efficient than map which also works on 1 record
 at a time.


 Is the only difference is -- only while loop as in below runs per record
 as in map . But code above that will be run once per partition.


 public IterableInteger call(IteratorString input)
 throws Exception {
 ListInteger output = new ArrayListInteger();
 while(input.hasNext()){
 output.add(input.next().length());
  }


 so if I don't have any heavy code above while loop, performance will be
 same as of map function.



 On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren inv...@gmail.com wrote:

 It's not the number of executors that matters, but the # of the CPU
 cores of your cluster.

 Each partition will be loaded on a core for computing.

 e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
 partitions (24 tasks for narrow dependency).
 Then all the 24 partitions will be loaded to your cluster in parallel,
 one on each core.
 You may notice that some tasks will finish more quickly than others. So
 divide the RDD into (2~3) x (# of cores) for better pipeline performance.
 Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
 cores, then first done first served until all 72 tasks are processed.

 Back to your origin question, map and mapPartitions are both
 transformation, but on different granularity.
 map = apply the function on each record in each partition.
 mapPartitions = apply the function on each partition.
 But the rule is the same, one partition per core.

 Hope it helps.
 Hao




 On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will
 be  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I
 call input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete
 partition of
  gbs ?
  Will this function call(IteratorString input) is called only for
 no of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





 --
 Hao Ren

 Data Engineer @ leboncoin

 Paris, France






Re: map vs mapPartitions

2015-06-25 Thread Corey Nolet
I don't know exactly what's going on under the hood but I would not assume
that just because a whole partition is not being pulled into memory @ one
time that that means each record is being pulled at 1 time. That's the
beauty of exposing Iterators  Iterables in an API rather than collections-
there's a bunch of buffering that can be hidden from the user to make the
iterations as efficient as they can be.

On Thu, Jun 25, 2015 at 11:36 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 yes, 1 partition per core and  mapPartitions apply function on each
 partition.

 Question is Does complete partition loads in memory so that function can
 be applied to it or its an iterator and iterator.next() loads next record
 and if yes then how is it efficient than map which also works on 1 record
 at a time.


 Is the only difference is -- only while loop as in below runs per record
 as in map . But code above that will be run once per partition.


 public IterableInteger call(IteratorString input)
 throws Exception {
 ListInteger output = new ArrayListInteger();
 while(input.hasNext()){
 output.add(input.next().length());
  }


 so if I don't have any heavy code above while loop, performance will be
 same as of map function.



 On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren inv...@gmail.com wrote:

 It's not the number of executors that matters, but the # of the CPU cores
 of your cluster.

 Each partition will be loaded on a core for computing.

 e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
 partitions (24 tasks for narrow dependency).
 Then all the 24 partitions will be loaded to your cluster in parallel,
 one on each core.
 You may notice that some tasks will finish more quickly than others. So
 divide the RDD into (2~3) x (# of cores) for better pipeline performance.
 Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
 cores, then first done first served until all 72 tasks are processed.

 Back to your origin question, map and mapPartitions are both
 transformation, but on different granularity.
 map = apply the function on each record in each partition.
 mapPartitions = apply the function on each partition.
 But the rule is the same, one partition per core.

 Hope it helps.
 Hao




 On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will be
  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I call
 input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete
 partition of
  gbs ?
  Will this function call(IteratorString input) is called only for no
 of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





 --
 Hao Ren

 Data Engineer @ leboncoin

 Paris, France





Reducer memory usage

2015-06-21 Thread Corey Nolet
I've seen a few places where it's been mentioned that after a shuffle each
reducer needs to pull its partition into memory in its entirety. Is this
true? I'd assume the merge sort that needs to be done (in the cases where
sortByKey() is not used) wouldn't need to pull all of the data into memory
at once... is it the sort for the sortByKey() that requires this to be done?


Re: Grouping elements in a RDD

2015-06-20 Thread Corey Nolet
If you use rdd.mapPartitions(), you'll be able to get a hold of the
iterators for each partiton. Then you should be able to do
iterator.grouped(size) on each of the partitions. I think it may mean you
have 1 element at the end of each partition that may have less than size
elements. If that's okay for you then that should work.

On Sat, Jun 20, 2015 at 7:48 PM, Brandon White bwwintheho...@gmail.com
wrote:

 How would you do a .grouped(10) on a RDD, is it possible? Here is an
 example for a Scala list

 scala List(1,2,3,4).grouped(2).toList
 res1: List[List[Int]] = List(List(1, 2), List(3, 4))

 Would like to group n elements.



Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Corey Nolet
Sorry Du,

Repartition means coalesce(shuffle = true) as per [1]. They are the same
operation. Coalescing with shuffle = false means you are specifying the max
amount of partitions after the coalesce (if there are less partitions you
will end up with the lesser amount.


[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L341


On Thu, Jun 18, 2015 at 7:55 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 repartition() means coalesce(shuffle=false)



   On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com
 wrote:


 Doesn't repartition call coalesce(shuffle=true)?
 On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 I got the same problem with rdd,repartition() in my streaming app, which
 generated a few huge partitions and many tiny partitions. The resulting
 high data skew makes the processing time of a batch unpredictable and often
 exceeding the batch interval. I eventually solved the problem by using
 rdd.coalesce() instead, which however is expensive as it yields a lot of
 shuffle traffic and also takes a long time.

 Du



   On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com
 wrote:


 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
 still puts everything into the same partition.

 We did manage to improve the situation by making a new partitioner that
 extends HashPartitioner.  It treats certain exception keys differently.
 These keys that are known to appear very often are assigned random
 partitions instead of using the existing partitioning mechanism.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org








Coalescing with shuffle = false in imbalanced cluster

2015-06-18 Thread Corey Nolet
I'm confused about this. The comment  on the function seems to indicate
that there is absolutely no shuffle or network IO but it also states that
it assigns an even number of parent partitions to each final partition
group. I'm having trouble seeing how this can be guaranteed without some
data passing around nodes.

For instance, lets saying I have 5 machines and 10 partitions but the way
the partitions are layed out is machines 1, 2, and 3 each have 3 partitions
while machine 4 only has 1 partition and machine 5 has none. Am I to assume
that coalesce(4, false) will the 3 partitions on nodes 1, 2, and 3 each to
1 partition while node 4 will just remain 1 partition?

Thanks.


Re: Is there programmatic way running Spark job on Yarn cluster without using spark-submit script ?

2015-06-18 Thread Corey Nolet
 This is not independent programmatic way of running of Spark job on Yarn
cluster.

The example I created simply demonstrates how to wire up the classpath so
that spark submit can be called programmatically. For my use case, I wanted
to hold open a connection so I could send tasks to the executors on demand.
If you were to submit this via yarn-cluster mode, it would only require any
extra files be placed on the executors, if needed.

On Wed, Jun 17, 2015 at 9:01 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 This is not independent programmatic way of running of Spark job on Yarn
 cluster.

 That example demonstrates running on *Yarn-client* mode, also will be
 dependent of Jetty. Users writing Spark programs do not want to depend on
 that.

 I found this SparkLauncher class introduced in Spark 1.4 version (
 https://github.com/apache/spark/tree/master/launcher) which allows
 running Spark jobs in programmatic way.

 SparkLauncher exists in Java and Scala APIs, but I could not find in
 Python API.

 Did not try it yet, but seems promising.

 Example:

 import org.apache.spark.launcher.SparkLauncher;

 public class MyLauncher {

 public static void main(String[] args) throws Exception {

  Process spark = new SparkLauncher()

.setAppResource(/my/app.jar)

.setMainClass(my.spark.app.Main)

.setMaster(local)

.setConf(SparkLauncher.DRIVER_MEMORY, 2g)

 .launch();

   spark.waitFor();

}

   }

 }



 On Wed, Jun 17, 2015 at 5:51 PM, Corey Nolet cjno...@gmail.com wrote:

 An example of being able to do this is provided in the Spark Jetty Server
 project [1]

 [1] https://github.com/calrissian/spark-jetty-server

 On Wed, Jun 17, 2015 at 8:29 PM, Elkhan Dadashov elkhan8...@gmail.com
 wrote:

 Hi all,

 Is there any way running Spark job in programmatic way on Yarn cluster
 without using spark-submit script ?

 I cannot include Spark jars on my Java application (due o dependency
 conflict and other reasons), so I'll be shipping Spark assembly uber jar
 (spark-assembly-1.3.1-hadoop2.3.0.jar) to Yarn cluster, and then execute
 job (Python or Java) on Yarn-cluster.

 So is there any way running Spark job implemented in python file/Java
 class without calling it through spark-submit script ?

 Thanks.






 --

 Best regards,
 Elkhan Dadashov



Re: Shuffle produces one huge partition and many tiny partitions

2015-06-18 Thread Corey Nolet
Doesn't repartition call coalesce(shuffle=true)?
On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote:

 I got the same problem with rdd,repartition() in my streaming app, which
 generated a few huge partitions and many tiny partitions. The resulting
 high data skew makes the processing time of a batch unpredictable and often
 exceeding the batch interval. I eventually solved the problem by using
 rdd.coalesce() instead, which however is expensive as it yields a lot of
 shuffle traffic and also takes a long time.

 Du



   On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com
 wrote:


 Thanks for the suggestion.  Repartition didn't help us unfortunately.  It
 still puts everything into the same partition.

 We did manage to improve the situation by making a new partitioner that
 extends HashPartitioner.  It treats certain exception keys differently.
 These keys that are known to appear very often are assigned random
 partitions instead of using the existing partitioning mechanism.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Executor memory allocations

2015-06-17 Thread Corey Nolet
So I've seen in the documentation that (after the overhead memory is
subtracted), the memory allocations of each executor are as follows (assume
default settings):

60% for cache
40% for tasks to process data


Reading about how Spark implements shuffling, I've also seen it say 20% of
executor memory is utilized for shuffles Does this 20% cut into the 40%
for tasks to process data or the 60% for the data cache?


Re: Is there programmatic way running Spark job on Yarn cluster without using spark-submit script ?

2015-06-17 Thread Corey Nolet
An example of being able to do this is provided in the Spark Jetty Server
project [1]

[1] https://github.com/calrissian/spark-jetty-server

On Wed, Jun 17, 2015 at 8:29 PM, Elkhan Dadashov elkhan8...@gmail.com
wrote:

 Hi all,

 Is there any way running Spark job in programmatic way on Yarn cluster
 without using spark-submit script ?

 I cannot include Spark jars on my Java application (due o dependency
 conflict and other reasons), so I'll be shipping Spark assembly uber jar
 (spark-assembly-1.3.1-hadoop2.3.0.jar) to Yarn cluster, and then execute
 job (Python or Java) on Yarn-cluster.

 So is there any way running Spark job implemented in python file/Java
 class without calling it through spark-submit script ?

 Thanks.





Using spark.hadoop.* to set Hadoop properties

2015-06-17 Thread Corey Nolet
I've become accustomed to being able to use system properties to override
properties in the Hadoop Configuration objects. I just recently noticed
that when Spark creates the Hadoop Configuraiton in the SparkContext, it
cycles through any properties prefixed with spark.hadoop. and add those
properties to the Hadoop Configuration (minus the spark.hadoop.). I don't
see this advertised anywhere in the documentation. Is this a method that is
supposed to be public to users? If so, should we add that to the
documentation?


Fully in-memory shuffles

2015-06-10 Thread Corey Nolet
Is it possible to configure Spark to do all of its shuffling FULLY in
memory (given that I have enough memory to store all the data)?


Re: Fully in-memory shuffles

2015-06-10 Thread Corey Nolet
So with this... to help my understanding of Spark under the hood-

Is this statement correct When data needs to pass between multiple JVMs, a
shuffle will *always* hit disk?

On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com wrote:

 There's a discussion of this at https://github.com/apache/spark/pull/5403



 On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote:

 Is it possible to configure Spark to do all of its shuffling FULLY in
 memory (given that I have enough memory to store all the data)?







Re: Fully in-memory shuffles

2015-06-10 Thread Corey Nolet
Ok so it is the case that small shuffles can be done without hitting any
disk. Is this the same case for the aux shuffle service in yarn? Can that
be done without hitting disk?

On Wed, Jun 10, 2015 at 9:17 PM, Patrick Wendell pwend...@gmail.com wrote:

 In many cases the shuffle will actually hit the OS buffer cache and
 not ever touch spinning disk if it is a size that is less than memory
 on the machine.

 - Patrick

 On Wed, Jun 10, 2015 at 5:06 PM, Corey Nolet cjno...@gmail.com wrote:
  So with this... to help my understanding of Spark under the hood-
 
  Is this statement correct When data needs to pass between multiple
 JVMs, a
  shuffle will always hit disk?
 
  On Wed, Jun 10, 2015 at 10:11 AM, Josh Rosen rosenvi...@gmail.com
 wrote:
 
  There's a discussion of this at
 https://github.com/apache/spark/pull/5403
 
 
 
  On Wed, Jun 10, 2015 at 7:08 AM, Corey Nolet cjno...@gmail.com wrote:
 
  Is it possible to configure Spark to do all of its shuffling FULLY in
  memory (given that I have enough memory to store all the data)?
 
 
 
 
 



Re: yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
Thanks Sandy- I was digging through the code in the deploy.yarn.Client and
literally found that property right before I saw your reply. I'm on 1.2.x
right now which doesn't have the property. I guess I need to update sooner
rather than later.

On Thu, May 28, 2015 at 3:56 PM, Sandy Ryza sandy.r...@cloudera.com wrote:

 Hi Corey,

 As of this PR https://github.com/apache/spark/pull/5297/files, this can
 be controlled with spark.yarn.submit.waitAppCompletion.

 -Sandy

 On Thu, May 28, 2015 at 11:48 AM, Corey Nolet cjno...@gmail.com wrote:

 I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
 noticing the jvm that fires up to allocate the resources, etc... is not
 going away after the application master and executors have been allocated.
 Instead, it just sits there printing 1 second status updates to the
 console. If I kill it, my job still runs (as expected).

 Is there an intended way to stop this from happening and just have the
 local JVM die when it's done allocating the resources and deploying the
 application master?





yarn-cluster spark-submit process not dying

2015-05-28 Thread Corey Nolet
I am submitting jobs to my yarn cluster via the yarn-cluster mode and I'm
noticing the jvm that fires up to allocate the resources, etc... is not
going away after the application master and executors have been allocated.
Instead, it just sits there printing 1 second status updates to the
console. If I kill it, my job still runs (as expected).

Is there an intended way to stop this from happening and just have the
local JVM die when it's done allocating the resources and deploying the
application master?


Blocking DStream.forEachRDD()

2015-05-07 Thread Corey Nolet
Is this somehtign I can do. I am using a FileOutputFormat inside of the
foreachRDD call. After the input format runs, I want to do some directory
cleanup and I want to block while I'm doing that. Is that something I can
do inside of this function? If not, where would I accomplish this on every
micro-batched itnerval?


Re: Blocking DStream.forEachRDD()

2015-05-07 Thread Corey Nolet
It does look the function that's executed is in the driver so doing an
Await.result() on a thread AFTER i've executed an action should work. Just
updating this here in case anyone has this question in the future.
Is this somehtign I can do. I am using a FileOutputFormat inside of the
foreachRDD call. After the input format runs, I want to do some directory
cleanup and I want to block while I'm doing that. Is that something I can
do inside of this function? If not, where would I accomplish this on every
micro-batched itnerval?


Re: real time Query engine Spark-SQL on Hbase

2015-04-30 Thread Corey Nolet
A tad off topic, but could still be relevant.

Accumulo's design is a tad different in the realm of being able to shard
and perform set intersections/unions server-side (through seeks). I've got
an adapter for Spark SQL on top of a document store implementation in
Accumulo that accepts the push-down predicates and actually performs query
on the tablet servers. This strategy may be useful to you [1].

[1]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/test/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreFilteredTest.scala

On Thu, Apr 30, 2015 at 10:54 AM, Ted Yu yuzhih...@gmail.com wrote:

 bq. a single query on one filter criteria

 Can you tell us more about your filter ? How selective is it ?

 Which hbase release are you using ?

 Cheers

 On Thu, Apr 30, 2015 at 7:23 AM, Siddharth Ubale 
 siddharth.ub...@syncoms.com wrote:

  Hi,



 I want to use Spark as Query engine on HBase with sub second latency.



 I am  using Spark 1.3  version. And followed the steps below on Hbase
 table with around 3.5 lac rows :



 *1.   *Mapped the Dataframe to Hbase table .RDDCustomers maps to the
 hbase table which is used to create the Dataframe.

 *“ DataFrame schemaCustomers = sqlInstance*

 *
 .createDataFrame(SparkContextImpl.getRddCustomers(),*

 *
 Customers.class);” *

 2.   Used registertemp table i.e”
 *schemaCustomers.registerTempTable(customers);”*

 3.   Running the query on Dataframe using Sqlcontext Instance.



 What I am observing is that for a single query on one filter criteria the
 query is taking 7-8 seconds? And the time increases as I am increasing the
 number of rows in Hbase table. Also, there was one time when I was getting
 query response under 1-2 seconds. Seems like strange behavior.

 Is this expected behavior from Spark or am I missing something here?

 Can somebody help me understand this scenario . Please assist.



 Thanks,

 Siddharth Ubale,







Re: DAG

2015-04-25 Thread Corey Nolet
Giovanni,

The DAG can be walked by calling the dependencies() function on any RDD.
It returns a  Seq containing the parent RDDs. If you start at the leaves
and walk through the parents until dependencies() returns an empty Seq, you
ultimately have your DAG.

On Sat, Apr 25, 2015 at 1:28 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 May be this will give you a good start
 https://github.com/apache/spark/pull/2077

 Thanks
 Best Regards

 On Sat, Apr 25, 2015 at 1:29 AM, Giovanni Paolo Gibilisco 
 gibb...@gmail.com wrote:

 Hi,
 I would like to know if it is possible to build the DAG before actually
 executing the application. My guess is that in the scheduler the DAG is
 built dynamically at runtime since it might depend on the data, but I was
 wondering if there is a way (and maybe a tool already) to analyze the code
 and buidl the DAG.

 Thank you!





Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Corey Nolet
If you return an iterable, you are not tying the API to a compactbuffer.
Someday, the data could be fetched lazily and he API would not have to
change.
On Apr 23, 2015 6:59 PM, Dean Wampler deanwamp...@gmail.com wrote:

 I wasn't involved in this decision (I just make the fries), but
 CompactBuffer is designed for relatively small data sets that at least fit
 in memory. It's more or less an Array. In principle, returning an iterator
 could hide the actual data structure that might be needed to hold a much
 bigger data set, if necessary.

 HOWEVER, it actually returns a CompactBuffer.


 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L444


 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Thu, Apr 23, 2015 at 5:46 PM, Hao Ren inv...@gmail.com wrote:

 Should I repost this to dev list ?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Streaming anomaly detection using ARIMA

2015-04-10 Thread Corey Nolet
Sean,

I do agree about the inside out parallelization but my curiosity is
mostly in what type of performance I can expect to have by piping out to R.
I'm playing with Twitter's new Anomaly Detection library btw, this could be
a solution if I can get the calls to R to stand up to the massive dataset
that I have.

I'll report back my findings.

On Thu, Apr 2, 2015 at 3:46 AM, Sean Owen so...@cloudera.com wrote:

 This inside out parallelization has been a way people have used R
 with MapReduce for a long time. Run N copies of an R script on the
 cluster, on different subsets of the data, babysat by Mappers. You
 just need R installed on the cluster. Hadoop Streaming makes this easy
 and things like RDD.pipe in Spark make it easier.

 So it may be just that simple and so there's not much to say about it.
 I haven't tried this with Spark Streaming but imagine it would also
 work. Have you tried this?

 Within a window you would probably take the first x% as training and
 the rest as test. I don't think there's a question of looking across
 windows.

 On Thu, Apr 2, 2015 at 12:31 AM, Corey Nolet cjno...@gmail.com wrote:
  Surprised I haven't gotten any responses about this. Has anyone tried
 using
  rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the
  other way- what I'd like to do is use R for model calculation and Spark
 to
  distribute the load across the cluster.
 
  Also, has anyone used Scalation for ARIMA models?
 
  On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote:
 
  Taking out the complexity of the ARIMA models to simplify things- I
 can't
  seem to find a good way to represent even standard moving averages in
 spark
  streaming. Perhaps it's my ignorance with the micro-batched style of the
  DStreams API.
 
  On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote:
 
  I want to use ARIMA for a predictive model so that I can take time
 series
  data (metrics) and perform a light anomaly detection. The time series
 data
  is going to be bucketed to different time units (several minutes within
  several hours, several hours within several days, several days within
  several years.
 
  I want to do the algorithm in Spark Streaming. I'm used to tuple at a
  time streaming and I'm having a tad bit of trouble gaining insight
 into how
  exactly the windows are managed inside of DStreams.
 
  Let's say I have a simple dataset that is marked by a key/value tuple
  where the key is the name of the component who's metrics I want to run
 the
  algorithm against and the value is a metric (a value representing a
 sum for
  the time bucket. I want to create histograms of the time series data
 for
  each key in the windows in which they reside so I can use that
 histogram
  vector to generate my ARIMA prediction (actually, it seems like this
 doesn't
  just apply to ARIMA but could apply to any sliding average).
 
  I *think* my prediction code may look something like this:
 
  val predictionAverages = dstream
.groupByKeyAndWindow(60*60*24, 60*60*24)
.mapValues(applyARIMAFunction)
 
  That is, keep 24 hours worth of metrics in each window and use that for
  the ARIMA prediction. The part I'm struggling with is how to join
 together
  the actual values so that i can do my comparison against the prediction
  model.
 
  Let's say dstream contains the actual values. For any time  window, I
  should be able to take a previous set of windows and use model to
 compare
  against the current values.
 
 
 
 



SparkR newHadoopAPIRDD

2015-04-01 Thread Corey Nolet
How hard would it be to expose this in some way? I ask because the current
textFile and objectFile functions are obviously at some point calling out
to a FileInputFormat and configuring it.

Could we get a way to configure any arbitrary inputformat / outputformat?


Re: Streaming anomaly detection using ARIMA

2015-04-01 Thread Corey Nolet
Surprised I haven't gotten any responses about this. Has anyone tried using
rJava or FastR w/ Spark? I've seen the SparkR project but thta goes the
other way- what I'd like to do is use R for model calculation and Spark to
distribute the load across the cluster.

Also, has anyone used Scalation for ARIMA models?

On Mon, Mar 30, 2015 at 9:30 AM, Corey Nolet cjno...@gmail.com wrote:

 Taking out the complexity of the ARIMA models to simplify things- I can't
 seem to find a good way to represent even standard moving averages in spark
 streaming. Perhaps it's my ignorance with the micro-batched style of the
 DStreams API.

 On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote:

 I want to use ARIMA for a predictive model so that I can take time series
 data (metrics) and perform a light anomaly detection. The time series data
 is going to be bucketed to different time units (several minutes within
 several hours, several hours within several days, several days within
 several years.

 I want to do the algorithm in Spark Streaming. I'm used to tuple at a
 time streaming and I'm having a tad bit of trouble gaining insight into
 how exactly the windows are managed inside of DStreams.

 Let's say I have a simple dataset that is marked by a key/value tuple
 where the key is the name of the component who's metrics I want to run the
 algorithm against and the value is a metric (a value representing a sum for
 the time bucket. I want to create histograms of the time series data for
 each key in the windows in which they reside so I can use that histogram
 vector to generate my ARIMA prediction (actually, it seems like this
 doesn't just apply to ARIMA but could apply to any sliding average).

 I *think* my prediction code may look something like this:

 val predictionAverages = dstream
   .groupByKeyAndWindow(60*60*24, 60*60*24)
   .mapValues(applyARIMAFunction)

 That is, keep 24 hours worth of metrics in each window and use that for
 the ARIMA prediction. The part I'm struggling with is how to join together
 the actual values so that i can do my comparison against the prediction
 model.

 Let's say dstream contains the actual values. For any time  window, I
 should be able to take a previous set of windows and use model to compare
 against the current values.






Re: Streaming anomaly detection using ARIMA

2015-03-30 Thread Corey Nolet
Taking out the complexity of the ARIMA models to simplify things- I can't
seem to find a good way to represent even standard moving averages in spark
streaming. Perhaps it's my ignorance with the micro-batched style of the
DStreams API.

On Fri, Mar 27, 2015 at 9:13 PM, Corey Nolet cjno...@gmail.com wrote:

 I want to use ARIMA for a predictive model so that I can take time series
 data (metrics) and perform a light anomaly detection. The time series data
 is going to be bucketed to different time units (several minutes within
 several hours, several hours within several days, several days within
 several years.

 I want to do the algorithm in Spark Streaming. I'm used to tuple at a
 time streaming and I'm having a tad bit of trouble gaining insight into
 how exactly the windows are managed inside of DStreams.

 Let's say I have a simple dataset that is marked by a key/value tuple
 where the key is the name of the component who's metrics I want to run the
 algorithm against and the value is a metric (a value representing a sum for
 the time bucket. I want to create histograms of the time series data for
 each key in the windows in which they reside so I can use that histogram
 vector to generate my ARIMA prediction (actually, it seems like this
 doesn't just apply to ARIMA but could apply to any sliding average).

 I *think* my prediction code may look something like this:

 val predictionAverages = dstream
   .groupByKeyAndWindow(60*60*24, 60*60*24)
   .mapValues(applyARIMAFunction)

 That is, keep 24 hours worth of metrics in each window and use that for
 the ARIMA prediction. The part I'm struggling with is how to join together
 the actual values so that i can do my comparison against the prediction
 model.

 Let's say dstream contains the actual values. For any time  window, I
 should be able to take a previous set of windows and use model to compare
 against the current values.





Streaming anomaly detection using ARIMA

2015-03-27 Thread Corey Nolet
I want to use ARIMA for a predictive model so that I can take time series
data (metrics) and perform a light anomaly detection. The time series data
is going to be bucketed to different time units (several minutes within
several hours, several hours within several days, several days within
several years.

I want to do the algorithm in Spark Streaming. I'm used to tuple at a
time streaming and I'm having a tad bit of trouble gaining insight into
how exactly the windows are managed inside of DStreams.

Let's say I have a simple dataset that is marked by a key/value tuple where
the key is the name of the component who's metrics I want to run the
algorithm against and the value is a metric (a value representing a sum for
the time bucket. I want to create histograms of the time series data for
each key in the windows in which they reside so I can use that histogram
vector to generate my ARIMA prediction (actually, it seems like this
doesn't just apply to ARIMA but could apply to any sliding average).

I *think* my prediction code may look something like this:

val predictionAverages = dstream
  .groupByKeyAndWindow(60*60*24, 60*60*24)
  .mapValues(applyARIMAFunction)

That is, keep 24 hours worth of metrics in each window and use that for the
ARIMA prediction. The part I'm struggling with is how to join together the
actual values so that i can do my comparison against the prediction model.

Let's say dstream contains the actual values. For any time  window, I
should be able to take a previous set of windows and use model to compare
against the current values.


Re: iPython Notebook + Spark + Accumulo -- best practice?

2015-03-26 Thread Corey Nolet
Spark uses a SerializableWritable [1] to java serialize writable objects.
I've noticed (at least in Spark 1.2.1) that it breaks down with some
objects when Kryo is used instead of regular java serialization. Though it
is  wrapping the actual AccumuloInputFormat (another example of something
you may want to do in the future), we have Accumulo working to load data
from a table into Spark SQL [2]. The way Spark uses the InputFormat is very
straightforward.

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
[2]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala#L76

On Thu, Mar 26, 2015 at 3:06 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 I'm guessing the Accumulo Key and Value classes are not serializable, so
 you would need to do something like

 val rdd = sc.newAPIHadoopRDD(...).map { case (key, value) =
 (extractScalaType(key), extractScalaType(value)) }

 Where 'extractScalaType converts the key or Value to a standard Scala type
 or case class or whatever - basically extracts the data from the Key or
 Value in a form usable in Scala

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Thu, Mar 26, 2015 at 8:59 PM, Russ Weeks rwe...@newbrightidea.com
 wrote:

 Hi, David,

 This is the code that I use to create a JavaPairRDD from an Accumulo
 table:

  JavaSparkContext sc = new JavaSparkContext(conf);
 Job hadoopJob = Job.getInstance(conf,TestSparkJob);
 job.setInputFormatClass(AccumuloInputFormat.class);
 AccumuloInputFormat.setZooKeeperInstance(job,
 conf.get(ZOOKEEPER_INSTANCE_NAME,
 conf.get(ZOOKEEPER_HOSTS)
 );
 AccumuloInputFormat.setConnectorInfo(job,
 conf.get(ACCUMULO_AGILE_USERNAME),
 new PasswordToken(conf.get(ACCUMULO_AGILE_PASSWORD))
 );
 AccumuloInputFormat.setInputTableName(job, conf.get(ACCUMULO_TABLE_NAME));
 AccumuloInputFormat.setScanAuthorizations(job, auths);
 JavaPairRDDKey, Value values =
 sc.newAPIHadoopRDD(hadoopJob.getConfiguration(), AccumuloInputFormat.class,
 Key.class, Value.class);

 Key.class and Value.class are from org.apache.accumulo.core.data. I use a
 WholeRowIterator so that the Value is actually an encoded representation of
 an entire logical row; it's a useful convenience if you can be sure that
 your rows always fit in memory.

 I haven't tested it since Spark 1.0.1 but I doubt anything important has
 changed.

 Regards,
 -Russ


 On Thu, Mar 26, 2015 at 11:41 AM, David Holiday dav...@annaisystems.com
 wrote:

  * progress!*

 i was able to figure out why the 'input INFO not set' error was
 occurring. the eagle-eyed among you will no doubt see the following code is
 missing a closing '('

 AbstractInputFormat.setConnectorInfo(jobConf, root, new 
 PasswordToken(password)

 as I'm doing this in spark-notebook, I'd been clicking the execute
 button and moving on because I wasn't seeing an error. what I forgot was
 that notebook is going to do what spark-shell will do when you leave off a
 closing ')' -- *it will wait forever for you to add it*. so the error
 was the result of the 'setConnectorInfo' method never getting executed.

 unfortunately, I'm still unable to shove the accumulo table data into an
 RDD that's useable to me. when I execute

 rddX.count

 I get back

 res15: Long = 1

 which is the correct response - there are 10,000 rows of data in the
 table I pointed to. however, when I try to grab the first element of data
 thusly:

 rddX.first

 I get the following error:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0 in stage 0.0 (TID 0) had a not serializable result:
 org.apache.accumulo.core.data.Key

 any thoughts on where to go from here?
   DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  dav...@annaisystems.com broo...@annaisystems.com


 GetFileAttachment.jpg

 www.AnnaiSystems.com

  On Mar 26, 2015, at 8:35 AM, David Holiday dav...@annaisystems.com
 wrote:

  hi Nick

 Unfortunately the Accumulo docs are woefully inadequate, and in some
 places, flat wrong. I'm not sure if this is a case where the docs are 'flat
 wrong', or if there's some wrinke with spark-notebook in the mix that's
 messing everything up. I've been working with some people on stack overflow
 on this same issue (including one of the people from the spark-notebook
 team):


 http://stackoverflow.com/questions/29244530/how-do-i-create-a-spark-rdd-from-accumulo-1-6-in-spark-notebook?noredirect=1#comment46755938_29244530

 if you click the link you can see the entire thread of code, responses
 from notebook, etc. I'm going to try invoking the same techniques both from
 within a stand-alone scala problem and from the shell itself to see if I
 can get some traction. I'll report back when I have more data.

 cheers (and thx!)



 DAVID HOLIDAY
  Software Engineer
  760 607 3300 | Office
  312 758 8385 | Mobile
  

Re: [SparkSQL] How to calculate stddev on a DataFrame?

2015-03-25 Thread Corey Nolet
I would do sum square. This would allow you to keep an ongoing value as an
associative operation (in an aggregator) and then calculate the variance 
std deviation after the fact.

On Wed, Mar 25, 2015 at 10:28 PM, Haopu Wang hw...@qilinsoft.com wrote:

  Hi,



 I have a DataFrame object and I want to do types of aggregations like
 count, sum, variance, stddev, etc.



 DataFrame has DSL to do simple aggregations like count and sum.



 How about variance and stddev?



 Thank you for any suggestions!





StreamingListener

2015-03-11 Thread Corey Nolet
Given the following scenario:

dstream.map(...).filter(...).window(...).foreachrdd()

When would the onBatchCompleted fire?


Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Corey Nolet
Thanks for taking this on Ted!

On Sat, Feb 28, 2015 at 4:17 PM, Ted Yu yuzhih...@gmail.com wrote:

 I have created SPARK-6085 with pull request:
 https://github.com/apache/spark/pull/4836

 Cheers

 On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote:

 +1 to a better default as well.

 We were working find until we ran against a real dataset which was much
 larger than the test dataset we were using locally. It took me a couple
 days and digging through many logs to figure out this value was what was
 causing the problem.

 On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote:

 Having good out-of-box experience is desirable.

 +1 on increasing the default.


 On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote:

 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com
 wrote:
  hey,
  running my first map-red like (meaning disk-to-disk, avoiding in
 memory
  RDDs) computation in spark on yarn i immediately got bitten by a too
 low
  spark.yarn.executor.memoryOverhead. however it took me about an hour
 to find
  out this was the cause. at first i observed failing shuffles leading
 to
  restarting of tasks, then i realized this was because executors could
 not be
  reached, then i noticed in containers got shut down and reallocated in
  resourcemanager logs (no mention of errors, it seemed the containers
  finished their business and shut down successfully), and finally i
 found the
  reason in nodemanager logs.
 
  i dont think this is a pleasent first experience. i realize
  spark.yarn.executor.memoryOverhead needs to be set differently from
  situation to situation. but shouldnt the default be a somewhat higher
 value
  so that these errors are unlikely, and then the experts that are
 willing to
  deal with these errors can tune it lower? so why not make the default
 10%
  instead of 7%? that gives something that works in most situations out
 of the
  box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: bitten by spark.yarn.executor.memoryOverhead

2015-02-28 Thread Corey Nolet
+1 to a better default as well.

We were working find until we ran against a real dataset which was much
larger than the test dataset we were using locally. It took me a couple
days and digging through many logs to figure out this value was what was
causing the problem.

On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote:

 Having good out-of-box experience is desirable.

 +1 on increasing the default.


 On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote:

 There was a recent discussion about whether to increase or indeed make
 configurable this kind of default fraction. I believe the suggestion
 there too was that 9-10% is a safer default.

 Advanced users can lower the resulting overhead value; it may still
 have to be increased in some cases, but a fatter default may make this
 kind of surprise less frequent.

 I'd support increasing the default; any other thoughts?

 On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote:
  hey,
  running my first map-red like (meaning disk-to-disk, avoiding in memory
  RDDs) computation in spark on yarn i immediately got bitten by a too low
  spark.yarn.executor.memoryOverhead. however it took me about an hour to
 find
  out this was the cause. at first i observed failing shuffles leading to
  restarting of tasks, then i realized this was because executors could
 not be
  reached, then i noticed in containers got shut down and reallocated in
  resourcemanager logs (no mention of errors, it seemed the containers
  finished their business and shut down successfully), and finally i
 found the
  reason in nodemanager logs.
 
  i dont think this is a pleasent first experience. i realize
  spark.yarn.executor.memoryOverhead needs to be set differently from
  situation to situation. but shouldnt the default be a somewhat higher
 value
  so that these errors are unlikely, and then the experts that are
 willing to
  deal with these errors can tune it lower? so why not make the default
 10%
  instead of 7%? that gives something that works in most situations out
 of the
  box (at the cost of being a little wasteful). it worked for me.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Missing shuffle files

2015-02-28 Thread Corey Nolet
Just wanted to point out- raising the memory-head (as I saw in the logs)
was the fix for this issue and I have not seen dying executors since this
calue was increased

On Tue, Feb 24, 2015 at 3:52 AM, Anders Arpteg arp...@spotify.com wrote:

 If you thinking of the yarn memory overhead, then yes, I have increased
 that as well. However, I'm glad to say that my job finished successfully
 finally. Besides the timeout and memory settings, performing repartitioning
 (with shuffling) at the right time seems to be the key to make this large
 job succeed. With all the transformations in the job, the partition
 distribution was becoming increasingly skewed. Not easy to figure out when
 and to what number of partitions to set, and takes forever to tweak these
 settings since it's works perfectly for small datasets and you'll have to
 experiment with large time-consuming jobs. Imagine if there was an
 automatic partition reconfiguration function that automagically did that...


 On Tue, Feb 24, 2015 at 3:20 AM, Corey Nolet cjno...@gmail.com wrote:

 I *think* this may have been related to the default memory overhead
 setting being too low. I raised the value to 1G it and tried my job again
 but i had to leave the office before it finished. It did get further but
 I'm not exactly sure if that's just because i raised the memory. I'll see
 tomorrow- but i have a suspicion this may have been the cause of the
 executors being killed by the application master.
 On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got
 over 6000 partitions for some of these RDDs which immediately blows the
 heap somehow- I'm still not exactly sure how. If I coalesce them down to
 about 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com
 wrote:

 Sounds very similar to what I experienced Corey. Something that seems
 to at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this 
 period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on 
 memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just

Re: Kafka DStream Parallelism

2015-02-27 Thread Corey Nolet
This was what I was thinking but wanted to verify. Thanks Sean!

On Fri, Feb 27, 2015 at 9:56 PM, Sean Owen so...@cloudera.com wrote:

 The coarsest level at which you can parallelize is topic. Topics are
 all but unrelated to each other so can be consumed independently. But
 you can parallelize within the context of a topic too.

 A Kafka group ID defines a consumer group. One consumer in a group
 receive each message to the topic that group is listening to. Topics
 can have partitions too. You can thus make N consumers in a group
 listening to N partitions and each will effectively be listening to a
 partition.

 Yes, my understanding is that multiple receivers in one group are the
 way to consume a topic's partitions in parallel.

 On Sat, Feb 28, 2015 at 12:56 AM, Corey Nolet cjno...@gmail.com wrote:
  Looking @ [1], it seems to recommend pull from multiple Kafka topics in
  order to parallelize data received from Kafka over multiple nodes. I
 notice
  in [2], however, that one of the createConsumer() functions takes a
 groupId.
  So am I understanding correctly that creating multiple DStreams with the
  same groupId allow data to be partitioned across many nodes on a single
  topic?
 
  [1]
 
 http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
  [2]
 
 https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$



Kafka DStream Parallelism

2015-02-27 Thread Corey Nolet
Looking @ [1], it seems to recommend pull from multiple Kafka topics in
order to parallelize data received from Kafka over multiple nodes. I notice
in [2], however, that one of the createConsumer() functions takes a
groupId. So am I understanding correctly that creating multiple DStreams
with the same groupId allow data to be partitioned across many nodes on a
single topic?

[1]
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
[2]
https://spark.apache.org/docs/1.2.0/api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$


Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
I think I'm getting more confused the longer this thread goes. So
rdd1.dependencies provides immediate parents to rdd1. For now i'm going to
walk my internal DAG from the root down and see where running the caching
of siblings concurrently gets me.

I still like your point, Sean, about trying to do this at the partition
level. I'll try something to see where I can get with that next.

On Thu, Feb 26, 2015 at 7:46 PM, Sean Owen so...@cloudera.com wrote:

 I think we already covered that in this thread. You get dependencies
 from RDD.dependencies()

 On Fri, Feb 27, 2015 at 12:31 AM, Zhan Zhang zzh...@hortonworks.com
 wrote:
  Currently in spark, it looks like there is no easy way to know the
  dependencies. It is solved at run time.
 
  Thanks.
 
  Zhan Zhang
 
  On Feb 26, 2015, at 4:20 PM, Corey Nolet cjno...@gmail.com wrote:
 
  Ted. That one I know. It was the dependency part I was curious about



How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
Let's say I'm given 2 RDDs and told to store them in a sequence file and
they have the following dependency:

val rdd1 = sparkContext.sequenceFile().cache()
val rdd2 = rdd1.map()


How would I tell programmatically without being the one who built rdd1 and
rdd2 whether or not rdd2 depends on rdd1?

I'm working on a concurrency model for my application and I won't
necessarily know how the two rdds are constructed. What I will know is
whether or not rdd1 is cached but i want to maximum concurrency and run
rdd1 and rdd2 together if rdd2 does not depend on rdd1.


Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
I see the rdd.dependencies() function, does that include ALL the
dependencies of an RDD? Is it safe to assume I can say
rdd2.dependencies.contains(rdd1)?

On Thu, Feb 26, 2015 at 4:28 PM, Corey Nolet cjno...@gmail.com wrote:

 Let's say I'm given 2 RDDs and told to store them in a sequence file and
 they have the following dependency:

 val rdd1 = sparkContext.sequenceFile().cache()
 val rdd2 = rdd1.map()


 How would I tell programmatically without being the one who built rdd1 and
 rdd2 whether or not rdd2 depends on rdd1?

 I'm working on a concurrency model for my application and I won't
 necessarily know how the two rdds are constructed. What I will know is
 whether or not rdd1 is cached but i want to maximum concurrency and run
 rdd1 and rdd2 together if rdd2 does not depend on rdd1.




Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
Zhan,

I think it might be helpful to point out that I'm trying to run the RDDs in
different threads to maximize the amount of work that can be done
concurrently. Unfortunately, right now if I had something like this:

val rdd1 = ..cache()
val rdd2 = rdd1.map().()


future { rdd1.saveAsHasoopFile(...) }
future { rdd2.saveAsHadoopFile(...)]

The final result is that rdd1 is calculated twice. My dataset is several
100's of gigs and I cannot wait for things to be calculated multiple times.
I've tried been running the saveAsHadoopFile() operations in a single
thread but I'm finding too much time being spent wioth no tasks running
when I know I could be better saturating the resources of the cluster.

I'm trying to do the scheduling myself now- to determine that rdd2 depends
on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I can
do the no-op on rdd1 before I run rdd2. I would much rather the DAG figure
this out so I don't need to think about all this.


On Thu, Feb 26, 2015 at 5:43 PM, Zhan Zhang zzh...@hortonworks.com wrote:

 You don’t need to know rdd dependencies to maximize dependencies.
 Internally the scheduler will construct the DAG and trigger the execution
 if there is no shuffle dependencies in between RDDs.

 Thanks.

 Zhan Zhang
 On Feb 26, 2015, at 1:28 PM, Corey Nolet cjno...@gmail.com wrote:

  Let's say I'm given 2 RDDs and told to store them in a sequence file and
 they have the following dependency:
 
  val rdd1 = sparkContext.sequenceFile().cache()
  val rdd2 = rdd1.map()
 
 
  How would I tell programmatically without being the one who built rdd1
 and rdd2 whether or not rdd2 depends on rdd1?
 
  I'm working on a concurrency model for my application and I won't
 necessarily know how the two rdds are constructed. What I will know is
 whether or not rdd1 is cached but i want to maximum concurrency and run
 rdd1 and rdd2 together if rdd2 does not depend on rdd1.
 




Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
 What confused me is  the statement of *The final result is that rdd1 is
calculated twice.” *Is it the expected behavior?

To be perfectly honest, performing an action on a cached RDD in two
different threads and having them (at the partition level) block until the
parent are cached would be the behavior and myself and all my coworkers
expected.

On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet cjno...@gmail.com wrote:

 I should probably mention that my example case is much over simplified-
 Let's say I've got a tree, a fairly complex one where I begin a series of
 jobs at the root which calculates a bunch of really really complex joins
 and as I move down the tree, I'm creating reports from the data that's
 already been joined (i've implemented logic to determine when cached items
 can be cleaned up, e.g. the last report has been done in a subtree).

 My issue is that the 'actions' on the rdds are currently being implemented
 in a single thread- even if I'm waiting on a cache to complete fully before
 I run the children jobs, I'm still in a better placed than I was because
 I'm able to run those jobs concurrently- right now this is not the case.

  What you want is for a request for partition X to wait if partition X is
 already being calculated in a persisted RDD.

 I totally agree and if I could get it so that it's waiting at the
 granularity of the partition, I'd be in a much much better place. I feel
 like I'm going down a rabbit hole and working against the Spark API.


 On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote:

 To distill this a bit further, I don't think you actually want rdd2 to
 wait on rdd1 in this case. What you want is for a request for
 partition X to wait if partition X is already being calculated in a
 persisted RDD. Otherwise the first partition of rdd2 waits on the
 final partition of rdd1 even when the rest is ready.

 That is probably usually a good idea in almost all cases. That much, I
 don't know how hard it is to implement. But I speculate that it's
 easier to deal with it at that level than as a function of the
 dependency graph.

 On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet cjno...@gmail.com wrote:
  I'm trying to do the scheduling myself now- to determine that rdd2
 depends
  on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I
 can
  do the no-op on rdd1 before I run rdd2. I would much rather the DAG
 figure
  this out so I don't need to think about all this.





Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
I should probably mention that my example case is much over simplified-
Let's say I've got a tree, a fairly complex one where I begin a series of
jobs at the root which calculates a bunch of really really complex joins
and as I move down the tree, I'm creating reports from the data that's
already been joined (i've implemented logic to determine when cached items
can be cleaned up, e.g. the last report has been done in a subtree).

My issue is that the 'actions' on the rdds are currently being implemented
in a single thread- even if I'm waiting on a cache to complete fully before
I run the children jobs, I'm still in a better placed than I was because
I'm able to run those jobs concurrently- right now this is not the case.

 What you want is for a request for partition X to wait if partition X is
already being calculated in a persisted RDD.

I totally agree and if I could get it so that it's waiting at the
granularity of the partition, I'd be in a much much better place. I feel
like I'm going down a rabbit hole and working against the Spark API.


On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote:

 To distill this a bit further, I don't think you actually want rdd2 to
 wait on rdd1 in this case. What you want is for a request for
 partition X to wait if partition X is already being calculated in a
 persisted RDD. Otherwise the first partition of rdd2 waits on the
 final partition of rdd1 even when the rest is ready.

 That is probably usually a good idea in almost all cases. That much, I
 don't know how hard it is to implement. But I speculate that it's
 easier to deal with it at that level than as a function of the
 dependency graph.

 On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet cjno...@gmail.com wrote:
  I'm trying to do the scheduling myself now- to determine that rdd2
 depends
  on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I
 can
  do the no-op on rdd1 before I run rdd2. I would much rather the DAG
 figure
  this out so I don't need to think about all this.



Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
Zhan,

This is exactly what I'm trying to do except, as I metnioned in my first
message, I am being given rdd1 and rdd2 only and I don't necessarily know
at that point whether or not rdd1 is a cached rdd. Further, I don't know at
that point whether or not rdd2 depends on rdd1.

On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang zzh...@hortonworks.com wrote:

  In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to
 finish probably due to writing to hdfs.  a walk around for this particular
 case may be as follows.

  val rdd1 = ..cache()

  val rdd2 = rdd1.map().()
 rdd1.count
  future { rdd1.saveAsHasoopFile(...) }
  future { rdd2.saveAsHadoopFile(…)]

  In this way, rdd1 will be calculated once, and two saveAsHadoopFile will
 happen concurrently.

  Thanks.

  Zhan Zhang



  On Feb 26, 2015, at 3:28 PM, Corey Nolet cjno...@gmail.com wrote:

   What confused me is  the statement of *The final result is that rdd1
 is calculated twice.” *Is it the expected behavior?

  To be perfectly honest, performing an action on a cached RDD in two
 different threads and having them (at the partition level) block until the
 parent are cached would be the behavior and myself and all my coworkers
 expected.

 On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet cjno...@gmail.com wrote:

  I should probably mention that my example case is much over simplified-
 Let's say I've got a tree, a fairly complex one where I begin a series of
 jobs at the root which calculates a bunch of really really complex joins
 and as I move down the tree, I'm creating reports from the data that's
 already been joined (i've implemented logic to determine when cached items
 can be cleaned up, e.g. the last report has been done in a subtree).

  My issue is that the 'actions' on the rdds are currently being
 implemented in a single thread- even if I'm waiting on a cache to complete
 fully before I run the children jobs, I'm still in a better placed than I
 was because I'm able to run those jobs concurrently- right now this is not
 the case.

  What you want is for a request for partition X to wait if partition X
 is already being calculated in a persisted RDD.

 I totally agree and if I could get it so that it's waiting at the
 granularity of the partition, I'd be in a much much better place. I feel
 like I'm going down a rabbit hole and working against the Spark API.


 On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote:

 To distill this a bit further, I don't think you actually want rdd2 to
 wait on rdd1 in this case. What you want is for a request for
 partition X to wait if partition X is already being calculated in a
 persisted RDD. Otherwise the first partition of rdd2 waits on the
 final partition of rdd1 even when the rest is ready.

 That is probably usually a good idea in almost all cases. That much, I
 don't know how hard it is to implement. But I speculate that it's
 easier to deal with it at that level than as a function of the
 dependency graph.

 On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet cjno...@gmail.com wrote:
  I'm trying to do the scheduling myself now- to determine that rdd2
 depends
  on rdd1 and rdd1 is a persistent RDD (storage level != None) so that I
 can
  do the no-op on rdd1 before I run rdd2. I would much rather the DAG
 figure
  this out so I don't need to think about all this.







Re: How to tell if one RDD depends on another

2015-02-26 Thread Corey Nolet
Ted. That one I know. It was the dependency part I was curious about
On Feb 26, 2015 7:12 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. whether or not rdd1 is a cached rdd

 RDD has getStorageLevel method which would return the RDD's current
 storage level.

 SparkContext has this method:
* Return information about what RDDs are cached, if they are in mem or
 on disk, how much space
* they take, etc.
*/
   @DeveloperApi
   def getRDDStorageInfo: Array[RDDInfo] = {

 Cheers

 On Thu, Feb 26, 2015 at 4:00 PM, Corey Nolet cjno...@gmail.com wrote:

 Zhan,

 This is exactly what I'm trying to do except, as I metnioned in my first
 message, I am being given rdd1 and rdd2 only and I don't necessarily know
 at that point whether or not rdd1 is a cached rdd. Further, I don't know at
 that point whether or not rdd2 depends on rdd1.

 On Thu, Feb 26, 2015 at 6:54 PM, Zhan Zhang zzh...@hortonworks.com
 wrote:

  In this case, it is slow to wait for rdd1.saveAsHasoopFile(...)  to
 finish probably due to writing to hdfs.  a walk around for this particular
 case may be as follows.

  val rdd1 = ..cache()

  val rdd2 = rdd1.map().()
 rdd1.count
  future { rdd1.saveAsHasoopFile(...) }
  future { rdd2.saveAsHadoopFile(…)]

  In this way, rdd1 will be calculated once, and two saveAsHadoopFile
 will happen concurrently.

  Thanks.

  Zhan Zhang



  On Feb 26, 2015, at 3:28 PM, Corey Nolet cjno...@gmail.com wrote:

   What confused me is  the statement of *The final result is that
 rdd1 is calculated twice.” *Is it the expected behavior?

  To be perfectly honest, performing an action on a cached RDD in two
 different threads and having them (at the partition level) block until the
 parent are cached would be the behavior and myself and all my coworkers
 expected.

 On Thu, Feb 26, 2015 at 6:26 PM, Corey Nolet cjno...@gmail.com wrote:

  I should probably mention that my example case is much over
 simplified- Let's say I've got a tree, a fairly complex one where I begin a
 series of jobs at the root which calculates a bunch of really really
 complex joins and as I move down the tree, I'm creating reports from the
 data that's already been joined (i've implemented logic to determine when
 cached items can be cleaned up, e.g. the last report has been done in a
 subtree).

  My issue is that the 'actions' on the rdds are currently being
 implemented in a single thread- even if I'm waiting on a cache to complete
 fully before I run the children jobs, I'm still in a better placed than I
 was because I'm able to run those jobs concurrently- right now this is not
 the case.

  What you want is for a request for partition X to wait if partition X
 is already being calculated in a persisted RDD.

 I totally agree and if I could get it so that it's waiting at the
 granularity of the partition, I'd be in a much much better place. I feel
 like I'm going down a rabbit hole and working against the Spark API.


 On Thu, Feb 26, 2015 at 6:03 PM, Sean Owen so...@cloudera.com wrote:

 To distill this a bit further, I don't think you actually want rdd2 to
 wait on rdd1 in this case. What you want is for a request for
 partition X to wait if partition X is already being calculated in a
 persisted RDD. Otherwise the first partition of rdd2 waits on the
 final partition of rdd1 even when the rest is ready.

 That is probably usually a good idea in almost all cases. That much, I
 don't know how hard it is to implement. But I speculate that it's
 easier to deal with it at that level than as a function of the
 dependency graph.

 On Thu, Feb 26, 2015 at 10:49 PM, Corey Nolet cjno...@gmail.com
 wrote:
  I'm trying to do the scheduling myself now- to determine that rdd2
 depends
  on rdd1 and rdd1 is a persistent RDD (storage level != None) so that
 I can
  do the no-op on rdd1 before I run rdd2. I would much rather the DAG
 figure
  this out so I don't need to think about all this.









Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I'm looking @ my yarn container logs for some of the executors which appear
to be failing (with the missing shuffle files). I see exceptions that say
client.TransportClientFactor: Found inactive connection to host/ip:port,
closing it.

Right after that I see shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
connect to host/ip:port

Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

Finally, following the sigterm, I see FileNotFoundExcception:
/hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
such file for directory)

I'm looking @ the nodemanager and application master logs and I see no
indications whatsoever that there were any memory issues during this period
of time. The Spark UI is telling me none of the executors are really using
too much memory when this happens. It is a big job that's catching several
100's of GB but each node manager on the cluster has 64gb of ram just for
yarn containers (physical nodes have 128gb). On this cluster, we have 128
nodes. I've also tried using DISK_ONLY storage level but to no avail.

Any further ideas on how to track this down? Again, we're able to run this
same job on about 1/5th of the data just fine.The only thing that's
pointing me towards a memory issue is that it seems to be happening in the
same stages each time and when I lower the memory that each executor has
allocated it happens in earlier stages but I can't seem to find anything
that says an executor (or container for that matter) has run low on memory.



On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
 wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors do
 you have total? And what is the memory size for each? You can change what
 fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching with the spark.storage.memoryFraction setting.

 On Sat, Feb 21, 2015 at 2:58 PM, Petar Zecevic petar.zece...@gmail.com
 wrote:


 Could you try to turn on the external shuffle service?

 spark.shuffle.service.enable = true


 On 21.2.2015. 17:50, Corey Nolet wrote:

 I'm experiencing the same issue. Upon closer inspection I'm noticing
 that executors are being lost as well. Thing is, I can't figure out how
 they are dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of
 memory allocated for the application. I was thinking perhaps it was
 possible that a single executor was getting a single or a couple large
 partitions but shouldn't the disk persistence kick in at that point?

 On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com
 wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

  User class threw exception: Job aborted due to stage failure: Task 450
 in stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in
 stage 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I've got the opposite problem with regards to partitioning. I've got over
6000 partitions for some of these RDDs which immediately blows the heap
somehow- I'm still not exactly sure how. If I coalesce them down to about
600-800 partitions, I get the problems where the executors are dying
without any other error messages (other than telling me the executor was
lost in the UI). If I don't coalesce, I pretty immediately get Java heap
space exceptions that kill the job altogether.

Putting in the timeouts didn't seem to help the case where I am coalescing.
Also, I don't see any dfferences between 'disk only' and 'memory and disk'
storage levels- both of them are having the same problems. I notice large
shuffle files (30-40gb) that only seem to spill a few hundred mb.

On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui same...@databricks.com
  wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching the
 Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could cause
 the stage to fail. Sometimes this forces the previous stage to also be
 re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines will serve out the map spill
 data, instead of the Executor JVMs (by default unless you turn external
 shuffle on, the Executor JVM itself serves out the shuffle data which
 causes problems if an Executor dies).

 Core, how often are Executors crashing in your app? How many Executors
 do you have total? And what is the memory size for each? You can change
 what fraction of the Executor heap will be used for your user code vs the
 shuffle vs RDD caching

Re: Missing shuffle files

2015-02-23 Thread Corey Nolet
I *think* this may have been related to the default memory overhead setting
being too low. I raised the value to 1G it and tried my job again but i had
to leave the office before it finished. It did get further but I'm not
exactly sure if that's just because i raised the memory. I'll see tomorrow-
but i have a suspicion this may have been the cause of the executors being
killed by the application master.
On Feb 23, 2015 5:25 PM, Corey Nolet cjno...@gmail.com wrote:

 I've got the opposite problem with regards to partitioning. I've got over
 6000 partitions for some of these RDDs which immediately blows the heap
 somehow- I'm still not exactly sure how. If I coalesce them down to about
 600-800 partitions, I get the problems where the executors are dying
 without any other error messages (other than telling me the executor was
 lost in the UI). If I don't coalesce, I pretty immediately get Java heap
 space exceptions that kill the job altogether.

 Putting in the timeouts didn't seem to help the case where I am
 coalescing. Also, I don't see any dfferences between 'disk only' and
 'memory and disk' storage levels- both of them are having the same
 problems. I notice large shuffle files (30-40gb) that only seem to spill a
 few hundred mb.

 On Mon, Feb 23, 2015 at 4:28 PM, Anders Arpteg arp...@spotify.com wrote:

 Sounds very similar to what I experienced Corey. Something that seems to
 at least help with my problems is to have more partitions. Am already
 fighting between ending up with too many partitions in the end and having
 too few in the beginning. By coalescing at late as possible and avoiding
 too few in the beginning, the problems seems to decrease. Also, increasing
 spark.akka.askTimeout and spark.core.connection.ack.wait.timeout
 significantly (~700 secs), the problems seems to almost disappear. Don't
 wont to celebrate yet, still long way left before the job complete but it's
 looking better...

 On Mon, Feb 23, 2015 at 9:54 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ my yarn container logs for some of the executors which
 appear to be failing (with the missing shuffle files). I see exceptions
 that say client.TransportClientFactor: Found inactive connection to
 host/ip:port, closing it.

 Right after that I see shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 1 outstanding blocks. java.io.IOException: Failed to
 connect to host/ip:port

 Right after that exception I see RECEIVED SIGNAL 15: SIGTERM

 Finally, following the sigterm, I see FileNotFoundExcception:
 /hdfs/01/yarn/nm/usercache../spark-local-uuid/shuffle_5_09_0.data (No
 such file for directory)

 I'm looking @ the nodemanager and application master logs and I see no
 indications whatsoever that there were any memory issues during this period
 of time. The Spark UI is telling me none of the executors are really using
 too much memory when this happens. It is a big job that's catching several
 100's of GB but each node manager on the cluster has 64gb of ram just for
 yarn containers (physical nodes have 128gb). On this cluster, we have 128
 nodes. I've also tried using DISK_ONLY storage level but to no avail.

 Any further ideas on how to track this down? Again, we're able to run
 this same job on about 1/5th of the data just fine.The only thing that's
 pointing me towards a memory issue is that it seems to be happening in the
 same stages each time and when I lower the memory that each executor has
 allocated it happens in earlier stages but I can't seem to find anything
 that says an executor (or container for that matter) has run low on memory.



 On Mon, Feb 23, 2015 at 9:24 AM, Anders Arpteg arp...@spotify.com
 wrote:

 No, unfortunately we're not making use of dynamic allocation or the
 external shuffle service. Hoping that we could reconfigure our cluster to
 make use of it, but since it requires changes to the cluster itself (and
 not just the Spark app), it could take some time.

 Unsure if task 450 was acting as a reducer or not, but seems possible.
 Probably due to a crashed executor as you say. Seems like I need to do some
 more advanced partition tuning to make this job work, as it's currently
 rather high number of partitions.

 Thanks for the help so far! It's certainly a frustrating task to debug
 when everything's working perfectly on sample data locally and crashes hard
 when running on the full dataset on the cluster...

 On Sun, Feb 22, 2015 at 9:27 AM, Sameer Farooqui 
 same...@databricks.com wrote:

 Do you guys have dynamic allocation turned on for YARN?

 Anders, was Task 450 in your job acting like a Reducer and fetching
 the Map spill output data from a different node?

 If a Reducer task can't read the remote data it needs, that could
 cause the stage to fail. Sometimes this forces the previous stage to also
 be re-computed if it's a wide dependency.

 But like Petar said, if you turn the external shuffle service on, YARN
 NodeManager process on the slave machines

Re: Missing shuffle files

2015-02-21 Thread Corey Nolet
I'm experiencing the same issue. Upon closer inspection I'm noticing that
executors are being lost as well. Thing is, I can't figure out how they are
dying. I'm using MEMORY_AND_DISK_SER and i've got over 1.3TB of memory
allocated for the application. I was thinking perhaps it was possible that
a single executor was getting a single or a couple large partitions but
shouldn't the disk persistence kick in at that point?

On Sat, Feb 21, 2015 at 11:20 AM, Anders Arpteg arp...@spotify.com wrote:

 For large jobs, the following error message is shown that seems to
 indicate that shuffle files for some reason are missing. It's a rather
 large job with many partitions. If the data size is reduced, the problem
 disappears. I'm running a build from Spark master post 1.2 (build at
 2015-01-16) and running on Yarn 2.2. Any idea of how to resolve this
 problem?

 User class threw exception: Job aborted due to stage failure: Task 450 in
 stage 450.1 failed 4 times, most recent failure: Lost task 450.3 in stage
 450.1 (TID 167370, lon4-hadoopslave-b77.lon4.spotify.net):
 java.io.FileNotFoundException:
 /disk/hd06/yarn/local/usercache/arpteg/appcache/application_1424333823218_21217/spark-local-20150221154811-998c/03/rdd_675_450
 (No such file or directory)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:221)
  at java.io.FileOutputStream.(FileOutputStream.java:171)
  at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:76)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
  at
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
  at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:149)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:74)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:264)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:231)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:64)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
  at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  at java.lang.Thread.run(Thread.java:745)

 TIA,
 Anders




Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Corey Nolet
We've been using commons configuration to pull our properties out of
properties files and system properties (prioritizing system properties over
others) and we add those properties to our spark conf explicitly and we use
ArgoPartser to get the command line argument for which property file to
load. We also implicitly added an extra parse args method to our SparkConf.
In our main method, we do something like this:

val sparkConf = SparkConfFactory.newSparkConf.parseModuleArts(args)
val sparkContext = new SparkContext(sparkConf)

Now all of our externally parsed properties are in the same spark conf so
we can pull them off anywhere in the program that has access to an
rdd/sparkcontext or the spark conf directly.

On Mon, Feb 16, 2015 at 10:42 AM, Sean Owen so...@cloudera.com wrote:

 How about system properties? or something like Typesafe Config which
 lets you at least override something in a built-in config file on the
 command line, with props or other files.

 On Mon, Feb 16, 2015 at 3:38 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Sean,
 
  I'm trying this as an alternative to what I currently do. Currently I
 have
  my module.properties file for my module in the resources directory, and
 that
  file is put inside the über JAR file when I build my application with
 Maven,
  and then when I submit it using spark-submit, I can read that
  module.properties file via the traditional method:
 
 
 
 properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties));
 
  and everything works fine. The disadvantage is that in order to make any
  changes to that .properties file effective, I have to re-build my
  application. Therefore I'm trying to find a way to be able to send that
  module.properties file via spark-submit and read the values in iy, so
 that I
  will not be forced to build my application every time I want to make a
  change in the module.properties file.
 
  I've also checked the --files option of spark-submit, but I see that
 it is
  for sending the listed files to executors (correct me if I'm wrong), what
  I'm after is being able to pass dynamic properties (key/value pairs) to
 the
  Driver program of my Spark application. And I still could not find out
 how
  to do that.
 
  --
  Emre
 
 
 
 
 
  On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote:
 
  Since SparkConf is only for Spark properties, I think it will in
  general only pay attention to and preserve spark.* properties. You
  could experiment with that. In general I wouldn't rely on Spark
  mechanisms for your configuration, and you can use any config
  mechanism you like to retain your own properties.
 
  On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com
  wrote:
   Hello,
  
   I'm using Spark 1.2.1 and have a module.properties file, and in it I
   have
   non-Spark properties, as well as Spark properties, e.g.:
  
  job.output.dir=file:///home/emre/data/mymodule/out
  
   I'm trying to pass it to spark-submit via:
  
  spark-submit --class com.myModule --master local[4] --deploy-mode
   client
   --verbose --properties-file /home/emre/data/mymodule.properties
   mymodule.jar
  
   And I thought I could read the value of my non-Spark property, namely,
   job.output.dir by using:
  
   SparkConf sparkConf = new SparkConf();
   final String validatedJSONoutputDir =
   sparkConf.get(job.output.dir);
  
   But it gives me an exception:
  
   Exception in thread main java.util.NoSuchElementException:
   job.output.dir
  
   Is it not possible to mix Spark and non-Spark properties in a single
   .properties file, then pass it via --properties-file and then get the
   values
   of those non-Spark properties via SparkConf?
  
   Or is there another object / method to retrieve the values for those
   non-Spark properties?
  
  
   --
   Emre Sevinç
 
 
 
 
  --
  Emre Sevinc

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




SparkSQL doesn't seem to like $'s in column names

2015-02-13 Thread Corey Nolet
I don't remember Oracle ever enforcing that I couldn't include a $ in a
column name, but I also don't thinking I've ever tried.

When using sqlContext.sql(...), I have a SELECT * from myTable WHERE
locations_$homeAddress = '123 Elm St'

It's telling me $ is invalid. Is this a bug?


Re: SparkSQL doesn't seem to like $'s in column names

2015-02-13 Thread Corey Nolet
This doesn't seem to have helped.

On Fri, Feb 13, 2015 at 2:51 PM, Michael Armbrust mich...@databricks.com
wrote:

 Try using `backticks` to escape non-standard characters.

 On Fri, Feb 13, 2015 at 11:30 AM, Corey Nolet cjno...@gmail.com wrote:

 I don't remember Oracle ever enforcing that I couldn't include a $ in a
 column name, but I also don't thinking I've ever tried.

 When using sqlContext.sql(...), I have a SELECT * from myTable WHERE
 locations_$homeAddress = '123 Elm St'

 It's telling me $ is invalid. Is this a bug?





Re: Boolean values as predicates in SQL string

2015-02-13 Thread Corey Nolet
Nevermind- I think I may have had a schema-related issue (sometimes
booleans were represented as string and sometimes as raw booleans but when
I populated the schema one or the other was chosen.



On Fri, Feb 13, 2015 at 8:03 PM, Corey Nolet cjno...@gmail.com wrote:

 Here are the results of a few different SQL strings (let's assume the
 schemas are valid for the data types used):

 SELECT * from myTable where key1 = true  - no filters are pushed to my
 PrunedFilteredScan
 SELECT * from myTable where key1 = true and key2 = 5 - 1 filter (key2) is
 pushed to my PrunedFilteredScan
 SELECT * from myTable where key1 = false and key3 = 'val3' - 1 filter
 (key3) is pushed to my PrunedFilteredScan
 SELECT * from myTable where key1 = 'false' - (as expected) it passed down
 an EqualTo Filter that's matching 'false' as a string.


 I was going to file a bug for this but before I did that I wanted to make
 sure I wasn't missing something (like a special way to handle booleans).
 I'm assuming the filter is being optimized out because it's being assigned
 the literal true but in this case I really want to match against a
 boolean datatype which is true.





Boolean values as predicates in SQL string

2015-02-13 Thread Corey Nolet
Here are the results of a few different SQL strings (let's assume the
schemas are valid for the data types used):

SELECT * from myTable where key1 = true  - no filters are pushed to my
PrunedFilteredScan
SELECT * from myTable where key1 = true and key2 = 5 - 1 filter (key2) is
pushed to my PrunedFilteredScan
SELECT * from myTable where key1 = false and key3 = 'val3' - 1 filter
(key3) is pushed to my PrunedFilteredScan
SELECT * from myTable where key1 = 'false' - (as expected) it passed down
an EqualTo Filter that's matching 'false' as a string.


I was going to file a bug for this but before I did that I wanted to make
sure I wasn't missing something (like a special way to handle booleans).
I'm assuming the filter is being optimized out because it's being assigned
the literal true but in this case I really want to match against a
boolean datatype which is true.


Re: Custom Kryo serializer

2015-02-12 Thread Corey Nolet
I was able to get this working by extending KryoRegistrator and setting the
spark.kryo.registrator property.

On Thu, Feb 12, 2015 at 12:31 PM, Corey Nolet cjno...@gmail.com wrote:

 I'm trying to register a custom class that extends Kryo's Serializer
 interface. I can't tell exactly what Class the registerKryoClasses()
 function on the SparkConf is looking for.

 How do I register the Serializer class?



Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition

2015-02-12 Thread Corey Nolet
So I tried this:

.mapPartitions(itr = {
itr.grouped(300).flatMap(items = {
myFunction(items)
})
})

and I tried this:

.mapPartitions(itr = {
itr.grouped(300).flatMap(myFunction)
})

 I tried making myFunction a method, a function val, and even moving it
into a singleton object.

The closure cleaner throws Task not serliazable exceptions with a distance
outer class whenever I do this.

Just to test, I tried this:

.flatMap(it = myFunction(Seq(it)))

And it worked just fine. What am I doing wrong here?

Also, my function is a little more complicated and it does take arguments
that depend on the class actually manipulating the RDD- but why would it
work fine with a single flatMap and not with mapPartitions? I am somewhat
new to Scala and maybe I'm missing something here.

On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 No, only each group should need to fit.

 On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet cjno...@gmail.com wrote:

 Doesn't iter still need to fit entirely into memory?

 On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 rdd.mapPartitions { iter =
   val grouped = iter.grouped(batchSize)
   for (group - grouped) { ... }
 }

 On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote:

 I think the word partition here is a tad different than the term
 partition that we use in Spark. Basically, I want something similar to
 Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
 want to run an algorithm that can be optimized by working on 30 people at a
 time, I'd like to be able to say:

 val rdd: RDD[People] = .
 val partitioned: RDD[Seq[People]] = rdd.partition(30)

 I also don't want any shuffling- everything can still be processed
 locally.


 [1]
 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)







Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition

2015-02-12 Thread Corey Nolet
The more I'm thinking about this- I may try this instead:

val myChunkedRDD: RDD[List[Event]] = inputRDD.mapPartitions(_
.grouped(300).toList)

I wonder if this would work. I'll try it when I get back to work tomorrow.


Yuyhao, I tried your approach too but it seems to be somehow moving all the
data to a single partition (no matter what window I set) and it seems to
lock up my jobs. I waited for 15 minutes for a stage that usually takes
about 15 seconds and I finally just killed the job in yarn.

On Thu, Feb 12, 2015 at 4:40 PM, Corey Nolet cjno...@gmail.com wrote:

 So I tried this:

 .mapPartitions(itr = {
 itr.grouped(300).flatMap(items = {
 myFunction(items)
 })
 })

 and I tried this:

 .mapPartitions(itr = {
 itr.grouped(300).flatMap(myFunction)
 })

  I tried making myFunction a method, a function val, and even moving it
 into a singleton object.

 The closure cleaner throws Task not serliazable exceptions with a distance
 outer class whenever I do this.

 Just to test, I tried this:

 .flatMap(it = myFunction(Seq(it)))

 And it worked just fine. What am I doing wrong here?

 Also, my function is a little more complicated and it does take arguments
 that depend on the class actually manipulating the RDD- but why would it
 work fine with a single flatMap and not with mapPartitions? I am somewhat
 new to Scala and maybe I'm missing something here.

 On Wed, Feb 11, 2015 at 5:59 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 No, only each group should need to fit.

 On Wed, Feb 11, 2015 at 2:56 PM, Corey Nolet cjno...@gmail.com wrote:

 Doesn't iter still need to fit entirely into memory?

 On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 rdd.mapPartitions { iter =
   val grouped = iter.grouped(batchSize)
   for (group - grouped) { ... }
 }

 On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote:

 I think the word partition here is a tad different than the term
 partition that we use in Spark. Basically, I want something similar to
 Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
 want to run an algorithm that can be optimized by working on 30 people at 
 a
 time, I'd like to be able to say:

 val rdd: RDD[People] = .
 val partitioned: RDD[Seq[People]] = rdd.partition(30)

 I also don't want any shuffling- everything can still be processed
 locally.


 [1]
 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)








Using Spark SQL for temporal data

2015-02-12 Thread Corey Nolet
I have a temporal data set in which I'd like to be able to query using
Spark SQL. The dataset is actually in Accumulo and I've already written a
CatalystScan implementation and RelationProvider[1] to register with the
SQLContext so that I can apply my SQL statements.

With my current implementation, the start and stop time ranges are set on
the RelationProvider (so ultimately they become a per-table setting). I'd
much rather be able to register the table without the time ranges and just
specify them through the SQL query string itself (perhaps a expression in
the WHERE clause?)


[1]
https://github.com/calrissian/accumulo-recipes/blob/master/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStoreCatalyst.scala


Custom Kryo serializer

2015-02-12 Thread Corey Nolet
I'm trying to register a custom class that extends Kryo's Serializer
interface. I can't tell exactly what Class the registerKryoClasses()
function on the SparkConf is looking for.

How do I register the Serializer class?


Easy way to partition an RDD into chunks like Guava's Iterables.partition

2015-02-11 Thread Corey Nolet
I think the word partition here is a tad different than the term
partition that we use in Spark. Basically, I want something similar to
Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
want to run an algorithm that can be optimized by working on 30 people at a
time, I'd like to be able to say:

val rdd: RDD[People] = .
val partitioned: RDD[Seq[People]] = rdd.partition(30)

I also don't want any shuffling- everything can still be processed locally.


[1]
http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)


Re: Easy way to partition an RDD into chunks like Guava's Iterables.partition

2015-02-11 Thread Corey Nolet
Doesn't iter still need to fit entirely into memory?

On Wed, Feb 11, 2015 at 5:55 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 rdd.mapPartitions { iter =
   val grouped = iter.grouped(batchSize)
   for (group - grouped) { ... }
 }

 On Wed, Feb 11, 2015 at 2:44 PM, Corey Nolet cjno...@gmail.com wrote:

 I think the word partition here is a tad different than the term
 partition that we use in Spark. Basically, I want something similar to
 Guava's Iterables.partition [1], that is, If I have an RDD[People] and I
 want to run an algorithm that can be optimized by working on 30 people at a
 time, I'd like to be able to say:

 val rdd: RDD[People] = .
 val partitioned: RDD[Seq[People]] = rdd.partition(30)

 I also don't want any shuffling- everything can still be processed
 locally.


 [1]
 http://docs.guava-libraries.googlecode.com/git/javadoc/com/google/common/collect/Iterables.html#partition(java.lang.Iterable,%20int)





Re: Writable serialization from InputFormat losing fields

2015-02-10 Thread Corey Nolet
I am able to get around the problem by doing a map and getting the Event
out of the EventWritable before I do my collect. I think I'll  do that for
now.

On Tue, Feb 10, 2015 at 6:04 PM, Corey Nolet cjno...@gmail.com wrote:

 I am using an input format to load data from Accumulo [1] in to a Spark
 RDD. It looks like something is happening in the serialization of my output
 writable between the time it is emitted from the InputFormat and the time
 it reaches its destination on the driver.

 What's happening is that the resulting Event object [2] inside the
 EventWritable [3] appears to have lost its Tuples [4]


 [1]
 https://github.com/calrissian/accumulo-recipes/blob/master/store/event-store/src/main/java/org/calrissian/accumulorecipes/eventstore/hadoop/EventInputFormat.java
 [2]
 https://github.com/calrissian/mango/blob/master/mango-core/src/main/java/org/calrissian/mango/domain/event/Event.java
 [3]
 https://github.com/calrissian/accumulo-recipes/blob/master/commons/src/main/java/org/calrissian/accumulorecipes/commons/hadoop/EventWritable.java
 [4]
 https://github.com/calrissian/mango/blob/master/mango-core/src/main/java/org/calrissian/mango/domain/Tuple.java

 I'm at a loss. I've tested using the SerializableWritable and serializing
 an EventWritable to an ObjectOutputStream in a unit test and it serialized
 fine without loss of data. I also verified that the Event object itself
 serializes and deserializes fine with an ObjectOutputStream. I'm trying to
 follow breakpoints through the code to figure out where exactly this may be
 happening but the objects all seem to be bytes already when passed into the
 JavaSerializerInstance (if I'm properly following what's going on, that
 is).

 Any ideas on what this may be? I'm using Spark 1.2.0 and Scala 2.10 but
 the business objects I'm using are from Java 1.7.





Re: How to design a long live spark application

2015-02-05 Thread Corey Nolet
Here's another lightweight example of running a SparkContext in a common
java servlet container: https://github.com/calrissian/spark-jetty-server

On Thu, Feb 5, 2015 at 11:46 AM, Charles Feduke charles.fed...@gmail.com
wrote:

 If you want to design something like Spark shell have a look at:

 http://zeppelin-project.org/

 Its open source and may already do what you need. If not, its source code
 will be helpful in answering the questions about how to integrate with long
 running jobs that you have.


 On Thu Feb 05 2015 at 11:42:56 AM Boromir Widas vcsub...@gmail.com
 wrote:

 You can check out https://github.com/spark-jobserver/spark-jobserver -
 this allows several users to upload their jars and run jobs with a REST
 interface.

 However, if all users are using the same functionality, you can write a
 simple spray server which will act as the driver and hosts the spark
 context+RDDs, launched in client mode.

 On Thu, Feb 5, 2015 at 10:25 AM, Shuai Zheng szheng.c...@gmail.com
 wrote:

 Hi All,



 I want to develop a server side application:



 User submit request à Server run spark application and return (this
 might take a few seconds).



 So I want to host the server to keep the long-live context, I don’t know
 whether this is reasonable or not.



 Basically I try to have a global JavaSparkContext instance and keep it
 there, and initialize some RDD. Then my java application will use it to
 submit the job.



 So now I have some questions:



 1, if I don’t close it, will there any timeout I need to configure on
 the spark server?

 2, In theory I want to design something similar to Spark shell (which
 also host a default sc there), just it is not shell based.



 Any suggestion? I think my request is very common for application
 development, here must someone has done it before?



 Regards,



 Shawn





Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Corey Nolet
My mistake Marcello, I was looking at the wrong message. That reply was
meant for bo yang.
On Feb 4, 2015 4:02 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Corey,

 On Wed, Feb 4, 2015 at 12:44 PM, Corey Nolet cjno...@gmail.com wrote:
  Another suggestion is to build Spark by yourself.
 
  I'm having trouble seeing what you mean here, Marcelo. Guava is already
  shaded to a different package for the 1.2.0 release. It shouldn't be
 causing
  conflicts.

 That wasn't my suggestion and I definitely do not recommend rebuilding
 Spark to work around these issues. :-)

 --
 Marcelo



Re: “mapreduce.job.user.classpath.first” for Spark

2015-02-04 Thread Corey Nolet
Bo yang-

I am using Spark 1.2.0 and undoubtedly there are older Guava classes which
are being picked up and serialized with the closures when they are sent
from the driver to the executors because the class serial version ids don't
match from the driver to the executors. Have you tried doing this? Guava
works fine for me when this is not the case- but as soon as a Guava class
which was changed from versions 15.0 is serialized, it fails. See [1] fore
info- we did fairly extensive testing last night. I've isolated the issue
to Hadoop's really old version of Guava being picked up. Again, this is
only noticeable when classes are used from Guava 15.0 that were changed
from previous versions and those classes are being serialized on the driver
and shipped to the executors.


[1] https://github.com/calrissian/mango/issues/158

On Wed, Feb 4, 2015 at 1:31 AM, bo yang bobyan...@gmail.com wrote:

 Corey,

 Which version of Spark do you use? I am using Spark 1.2.0, and  guava
 15.0. It seems fine.

 Best,
 Bo


 On Tue, Feb 3, 2015 at 8:56 PM, M. Dale medal...@yahoo.com.invalid
 wrote:

  Try spark.yarn.user.classpath.first (see
 https://issues.apache.org/jira/browse/SPARK-2996 - only works for YARN).
 Also thread at
 http://apache-spark-user-list.1001560.n3.nabble.com/netty-on-classpath-when-using-spark-submit-td18030.html
 .

 HTH,
 Markus

 On 02/03/2015 11:20 PM, Corey Nolet wrote:

 I'm having a really bad dependency conflict right now with Guava versions
 between my Spark application in Yarn and (I believe) Hadoop's version.

  The problem is, my driver has the version of Guava which my application
 is expecting (15.0) while it appears the Spark executors that are working
 on my RDDs have a much older version (assuming it's the old version on the
 Hadoop classpath).

  Is there a property like mapreduce.job.user.classpath.first' that I
 can set to make sure my own classpath is extablished first on the executors?






“mapreduce.job.user.classpath.first” for Spark

2015-02-03 Thread Corey Nolet
I'm having a really bad dependency conflict right now with Guava versions
between my Spark application in Yarn and (I believe) Hadoop's version.

The problem is, my driver has the version of Guava which my application is
expecting (15.0) while it appears the Spark executors that are working on
my RDDs have a much older version (assuming it's the old version on the
Hadoop classpath).

Is there a property like mapreduce.job.user.classpath.first' that I can
set to make sure my own classpath is extablished first on the executors?


Long pauses after writing to sequence files

2015-01-30 Thread Corey Nolet
We have a series of spark jobs which run in succession over various cached
datasets, do small groups and transforms, and then call
saveAsSequenceFile() on them.

Each call to save as a sequence file appears to have done its work, the
task says it completed in xxx.x seconds but then it pauses and the
pauses are quite significant- sometimes up to 2 minutes. We are trying to
figure out what's going on during this pause- if the executors are really
still writing to the sequence files or if maybe a race condition is
happening on an executor which is causing timeouts.

Any ideas? Anyone else seen this happening?


We also tried running all the saveAsSequenceFile calls in separate futures
to see if maybe the waiting would still only take 1-2 minutes but it looks
like the waiting still takes the sum of the amount  of time it would have
originally (several several minutes). Our job runs, in its entirety, 35
minutes and we're estimating that we're spending at least 16 minutes in
this paused state. What I haven't been able to do is figure out how to
trace through all the executors. Is there a way to do this? The event logs
in yarn don't seem to help much with this.


Re: Partition + equivalent of MapReduce multiple outputs

2015-01-28 Thread Corey Nolet
I'm looking @ the ShuffledRDD code and it looks like there is a method
setKeyOrdering()- is this guaranteed to order everything in the partition?
I'm on Spark 1.2.0

On Wed, Jan 28, 2015 at 9:07 AM, Corey Nolet cjno...@gmail.com wrote:

 In all of the soutions I've found thus far, sorting has been by casting
 the partition iterator into an array and sorting the array. This is not
 going to work for my case as the amount of data in each partition may not
 necessarily fit into memory. Any ideas?

 On Wed, Jan 28, 2015 at 1:29 AM, Corey Nolet cjno...@gmail.com wrote:

 I wanted to update this thread for others who may be looking for a
 solution to his as well. I found [1] and I'm going to investigate if this
 is a viable solution.

 [1]
 http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

 On Wed, Jan 28, 2015 at 12:51 AM, Corey Nolet cjno...@gmail.com wrote:

 I need to be able to take an input RDD[Map[String,Any]] and split it
 into several different RDDs based on some partitionable piece of the key
 (groups) and then send each partition to a separate set of files in
 different folders in HDFS.

 1) Would running the RDD through a custom partitioner be the best way to
 go about this or should I split the RDD into different RDDs and call
 saveAsHadoopFile() on each?
 2) I need the resulting partitions sorted by key- they also need to be
 written to the underlying files in sorted order.
 3) The number of keys in each partition will almost always be too big to
 fit into memory.

 Thanks.






Re: Partition + equivalent of MapReduce multiple outputs

2015-01-28 Thread Corey Nolet
I think this repartitionAndSortWithinPartitions() method may be what I'm
looking for in [1]. At least it sounds like it is. Will this method allow
me to deal with sorted partitions even when the partition doesn't fit into
memory?

[1]
https://github.com/apache/spark/blob/branch-1.2/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

On Wed, Jan 28, 2015 at 9:16 AM, Corey Nolet cjno...@gmail.com wrote:

 I'm looking @ the ShuffledRDD code and it looks like there is a method
 setKeyOrdering()- is this guaranteed to order everything in the partition?
 I'm on Spark 1.2.0

 On Wed, Jan 28, 2015 at 9:07 AM, Corey Nolet cjno...@gmail.com wrote:

 In all of the soutions I've found thus far, sorting has been by casting
 the partition iterator into an array and sorting the array. This is not
 going to work for my case as the amount of data in each partition may not
 necessarily fit into memory. Any ideas?

 On Wed, Jan 28, 2015 at 1:29 AM, Corey Nolet cjno...@gmail.com wrote:

 I wanted to update this thread for others who may be looking for a
 solution to his as well. I found [1] and I'm going to investigate if this
 is a viable solution.

 [1]
 http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

 On Wed, Jan 28, 2015 at 12:51 AM, Corey Nolet cjno...@gmail.com wrote:

 I need to be able to take an input RDD[Map[String,Any]] and split it
 into several different RDDs based on some partitionable piece of the key
 (groups) and then send each partition to a separate set of files in
 different folders in HDFS.

 1) Would running the RDD through a custom partitioner be the best way
 to go about this or should I split the RDD into different RDDs and call
 saveAsHadoopFile() on each?
 2) I need the resulting partitions sorted by key- they also need to be
 written to the underlying files in sorted order.
 3) The number of keys in each partition will almost always be too big
 to fit into memory.

 Thanks.







Spark 1.2.x Yarn Auxiliary Shuffle Service

2015-01-27 Thread Corey Nolet
I've read that this is supposed to be a rather significant optimization to
the shuffle system in 1.1.0 but I'm not seeing much documentation on
enabling this in Yarn. I see github classes for it in 1.2.0 and a property
spark.shuffle.service.enabled in the spark-defaults.conf.

The code mentions that this is supposed to be run inside the Nodemanager so
I'm assuming it needs to be wired up in the yarn-site.xml under the
yarn.nodemanager.aux-services property?


Re: Partition + equivalent of MapReduce multiple outputs

2015-01-27 Thread Corey Nolet
I wanted to update this thread for others who may be looking for a solution
to his as well. I found [1] and I'm going to investigate if this is a
viable solution.

[1]
http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

On Wed, Jan 28, 2015 at 12:51 AM, Corey Nolet cjno...@gmail.com wrote:

 I need to be able to take an input RDD[Map[String,Any]] and split it into
 several different RDDs based on some partitionable piece of the key
 (groups) and then send each partition to a separate set of files in
 different folders in HDFS.

 1) Would running the RDD through a custom partitioner be the best way to
 go about this or should I split the RDD into different RDDs and call
 saveAsHadoopFile() on each?
 2) I need the resulting partitions sorted by key- they also need to be
 written to the underlying files in sorted order.
 3) The number of keys in each partition will almost always be too big to
 fit into memory.

 Thanks.



Partition + equivalent of MapReduce multiple outputs

2015-01-27 Thread Corey Nolet
I need to be able to take an input RDD[Map[String,Any]] and split it into
several different RDDs based on some partitionable piece of the key
(groups) and then send each partition to a separate set of files in
different folders in HDFS.

1) Would running the RDD through a custom partitioner be the best way to go
about this or should I split the RDD into different RDDs and call
saveAsHadoopFile() on each?
2) I need the resulting partitions sorted by key- they also need to be
written to the underlying files in sorted order.
3) The number of keys in each partition will almost always be too big to
fit into memory.

Thanks.


Re: Spark SQL Custom Predicate Pushdown

2015-01-17 Thread Corey Nolet
I see now. It optimizes the selection semantics so that less things need to
be included just to do a count(). Very nice. I did a collect() instead of a
count just to see what would happen and it looks like the all the expected
select fields were propagated down as expected. Thanks.





On Sat, Jan 17, 2015 at 4:29 PM, Michael Armbrust mich...@databricks.com
wrote:

 How are you running your test here?  Are you perhaps doing a .count()?

 On Sat, Jan 17, 2015 at 12:54 PM, Corey Nolet cjno...@gmail.com wrote:

 Michael,

 What I'm seeing (in Spark 1.2.0) is that the required columns being
 pushed down to the DataRelation are not the product of the SELECT clause
 but rather just the columns explicitly included in the WHERE clause.

 Examples from my testing:

 SELECT * FROM myTable -- The required columns are empty.
 SELECT key1 FROM myTable -- The required columns are empty
 SELECT * FROM myTable where key1 = 'val1' -- The required columns
 contains key1.
 SELECT key1,key2 FROM myTable where key1 = 'val1' -- The required
 columns contains key1
 SELECT key1,key2 FROM myTable where key1 = 'val1' and key2 = 'val2' --
 The required columns cintains key1,key2



 I created SPARK-5296 for the OR predicate to be pushed down in some
 capacity.







 On Sat, Jan 17, 2015 at 3:38 PM, Michael Armbrust mich...@databricks.com
  wrote:

 1) The fields in the SELECT clause are not pushed down to the predicate
 pushdown API. I have many optimizations that allow fields to be filtered
 out before the resulting object is serialized on the Accumulo tablet
 server. How can I get the selection information from the execution plan?
 I'm a little hesitant to implement the data relation that allows me to see
 the logical plan because it's noted in the comments that it could change
 without warning.


 I'm not sure I understand.  The list of required columns should be
 pushed down to the data source.  Are you looking for something more
 complicated?


 2) I'm surprised to find that the predicate pushdown filters get
 completely removed when I do anything more complex in a where clause other
 than simple AND statements. Using an OR statement caused the filter array
 that was passed into the PrunedFilteredDataSource to be empty.


 This was just an initial cut at the set of predicates to push down.  We
 can add Or.  Mind opening a JIRA?






Re: Spark SQL Custom Predicate Pushdown

2015-01-17 Thread Corey Nolet
Michael,

What I'm seeing (in Spark 1.2.0) is that the required columns being pushed
down to the DataRelation are not the product of the SELECT clause but
rather just the columns explicitly included in the WHERE clause.

Examples from my testing:

SELECT * FROM myTable -- The required columns are empty.
SELECT key1 FROM myTable -- The required columns are empty
SELECT * FROM myTable where key1 = 'val1' -- The required columns contains
key1.
SELECT key1,key2 FROM myTable where key1 = 'val1' -- The required columns
contains key1
SELECT key1,key2 FROM myTable where key1 = 'val1' and key2 = 'val2' -- The
required columns cintains key1,key2



I created SPARK-5296 for the OR predicate to be pushed down in some
capacity.







On Sat, Jan 17, 2015 at 3:38 PM, Michael Armbrust mich...@databricks.com
wrote:

 1) The fields in the SELECT clause are not pushed down to the predicate
 pushdown API. I have many optimizations that allow fields to be filtered
 out before the resulting object is serialized on the Accumulo tablet
 server. How can I get the selection information from the execution plan?
 I'm a little hesitant to implement the data relation that allows me to see
 the logical plan because it's noted in the comments that it could change
 without warning.


 I'm not sure I understand.  The list of required columns should be pushed
 down to the data source.  Are you looking for something more complicated?


 2) I'm surprised to find that the predicate pushdown filters get
 completely removed when I do anything more complex in a where clause other
 than simple AND statements. Using an OR statement caused the filter array
 that was passed into the PrunedFilteredDataSource to be empty.


 This was just an initial cut at the set of predicates to push down.  We
 can add Or.  Mind opening a JIRA?



Re: Spark SQL Custom Predicate Pushdown

2015-01-17 Thread Corey Nolet
I did an initial implementation. There are two assumptions i had from the
start that I was very surprised were not a part of the predicate pushdown
API:

1) The fields in the SELECT clause are not pushed down to the predicate
pushdown API. I have many optimizations that allow fields to be filtered
out before the resulting object is serialized on the Accumulo tablet
server. How can I get the selection information from the execution plan?
I'm a little hesitant to implement the data relation that allows me to see
the logical plan because it's noted in the comments that it could change
without warning.

2) I'm surprised to find that the predicate pushdown filters get completely
removed when I do anything more complex in a where clause other than simple
AND statements. Using an OR statement caused the filter array that was
passed into the PrunedFilteredDataSource to be empty.


I have an example [1] of what I'm trying to accomplish.

[1]
https://github.com/calrissian/accumulo-recipes/blob/273/thirdparty/spark/src/main/scala/org/calrissian/accumulorecipes/spark/sql/EventStore.scala#L49


On Fri, Jan 16, 2015 at 10:17 PM, Corey Nolet cjno...@gmail.com wrote:

 Hao,

 Thanks so much for the links! This is exactly what I'm looking for. If I
 understand correctly, I can extend PrunedFilteredScan, PrunedScan, and
 TableScan and I should be able to support all the sql semantics?

 I'm a little confused about the Array[Filter] that is used with the
 Filtered scan. I have the ability to perform pretty robust seeks in the
 underlying data sets in Accumulo. I have an inverted index and I'm able to
 do intersections as well as unions- and rich predicates which form a tree
 of alternating intersections and unions. If I understand correctly- the
 Array[Filter] is to be treated as an AND operator? Do OR operators get
 propagated through the API at all? I'm trying to do as much pairing down of
 the dataset as possible on the individual tablet servers so that the data
 loaded into the spark layer is minimal- really used to perform joins,
 groupBys, sortBys and other computations that would require the relations
 to be combined in various ways.

 Thanks again for pointing me to this.



 On Fri, Jan 16, 2015 at 2:07 AM, Cheng, Hao hao.ch...@intel.com wrote:

  The Data Source API probably work for this purpose.

 It support the column pruning and the Predicate Push Down:


 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala



 Examples also can be found in the unit test:


 https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources





 *From:* Corey Nolet [mailto:cjno...@gmail.com]
 *Sent:* Friday, January 16, 2015 1:51 PM
 *To:* user
 *Subject:* Spark SQL Custom Predicate Pushdown



 I have document storage services in Accumulo that I'd like to expose to
 Spark SQL. I am able to push down predicate logic to Accumulo to have it
 perform only the seeks necessary on each tablet server to grab the results
 being asked for.



 I'm interested in using Spark SQL to push those predicates down to the
 tablet servers. Where wouldI begin my implementation? Currently I have an
 input format which accepts a query object that gets pushed down. How
 would I extract this information from the HiveContext/SQLContext to be able
 to push this down?





Re: Creating Apache Spark-powered “As Service” applications

2015-01-16 Thread Corey Nolet
There's also an example of running a SparkContext in a java servlet
container from Calrissian: https://github.com/calrissian/spark-jetty-server

On Fri, Jan 16, 2015 at 2:31 PM, olegshirokikh o...@solver.com wrote:

 The question is about the ways to create a Windows desktop-based and/or
 web-based application client that is able to connect and talk to the server
 containing Spark application (either local or on-premise cloud
 distributions) in the run-time.

 Any language/architecture may work. So far, I've seen two things that may
 be
 a help in that, but I'm not so sure if they would be the best alternative
 and how they work yet:

 Spark Job Server - https://github.com/spark-jobserver/spark-jobserver -
 defines a REST API for Spark
 Hue -

 http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/
 - uses item 1)

 Any advice would be appreciated. Simple toy example program (or steps) that
 shows, e.g. how to build such client for simply creating Spark Context on a
 local machine and say reading text file and returning basic stats would be
 ideal answer!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Creating-Apache-Spark-powered-As-Service-applications-tp21193.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark SQL Custom Predicate Pushdown

2015-01-16 Thread Corey Nolet
Hao,

Thanks so much for the links! This is exactly what I'm looking for. If I
understand correctly, I can extend PrunedFilteredScan, PrunedScan, and
TableScan and I should be able to support all the sql semantics?

I'm a little confused about the Array[Filter] that is used with the
Filtered scan. I have the ability to perform pretty robust seeks in the
underlying data sets in Accumulo. I have an inverted index and I'm able to
do intersections as well as unions- and rich predicates which form a tree
of alternating intersections and unions. If I understand correctly- the
Array[Filter] is to be treated as an AND operator? Do OR operators get
propagated through the API at all? I'm trying to do as much pairing down of
the dataset as possible on the individual tablet servers so that the data
loaded into the spark layer is minimal- really used to perform joins,
groupBys, sortBys and other computations that would require the relations
to be combined in various ways.

Thanks again for pointing me to this.



On Fri, Jan 16, 2015 at 2:07 AM, Cheng, Hao hao.ch...@intel.com wrote:

  The Data Source API probably work for this purpose.

 It support the column pruning and the Predicate Push Down:


 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala



 Examples also can be found in the unit test:


 https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/sources





 *From:* Corey Nolet [mailto:cjno...@gmail.com]
 *Sent:* Friday, January 16, 2015 1:51 PM
 *To:* user
 *Subject:* Spark SQL Custom Predicate Pushdown



 I have document storage services in Accumulo that I'd like to expose to
 Spark SQL. I am able to push down predicate logic to Accumulo to have it
 perform only the seeks necessary on each tablet server to grab the results
 being asked for.



 I'm interested in using Spark SQL to push those predicates down to the
 tablet servers. Where wouldI begin my implementation? Currently I have an
 input format which accepts a query object that gets pushed down. How
 would I extract this information from the HiveContext/SQLContext to be able
 to push this down?



  1   2   >