Re: Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
Hello, I think I may have jumped to the wrong conclusion about symlinks,
and I was able to get what I want working perfectly.

I added these two settings in my importer application:

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs",
"false")

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")


Then when I read the parquet table, I set the "basePath" option to the
parent of each of the partitions, e.g.:

val df = sqlContext.read.options(Map("basePath" ->
"/path/to/table")).parquet("/path/to/table/a=*")


I also checked that the symlinks were followed the way I wanted, by
removing one of the symlinks after creating the DataFrame, and I was able
to query the DataFrame without error.

- Philip


On Fri, Apr 29, 2016 at 9:56 AM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> Hello,
>
> I have a parquet dataset, partitioned by a column 'a'. I want to take
> advantage
> of Spark SQL's ability to filter to the partition when you filter on 'a'.
> I also
> want to periodically update individual partitions without disrupting any
> jobs
> that are querying the data.
>
> The obvious solution was to write parquet datasets to a separate directory
> and
> then update a symlink to point to it. Readers resolve the symlink to
> construct
> the DataFrame, so that when an update occurs any jobs continue to read the
> version of the data that they started with. Old data is cleaned up after
> no jobs
> are using it.
>
> This strategy works fine when updating an entire top-level parquet
> database. However, it seems like Spark SQL (or parquet) cannot handle
> partition
> directories being symlinks (and even if it could, it probably wouldn't
> resolve
> those symlinks so that it doesn't blow up when the symlink changes at
> runtime). For example, if you create symlinks a=1, a=2 and a=3 in a
> directory
> and then try to load that directory in Spark SQL, you get the "Conflicting
> partition column names detected".
>
> So my question is, can anyone think of another solution that meets my
> requirements (i.e. to take advantage of paritioning and perform safe
> updates of
> existing partitions)?
>
> Thanks!
>
> - Philip
>
>
>


Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
Hello,

I have a parquet dataset, partitioned by a column 'a'. I want to take
advantage
of Spark SQL's ability to filter to the partition when you filter on 'a'. I
also
want to periodically update individual partitions without disrupting any
jobs
that are querying the data.

The obvious solution was to write parquet datasets to a separate directory
and
then update a symlink to point to it. Readers resolve the symlink to
construct
the DataFrame, so that when an update occurs any jobs continue to read the
version of the data that they started with. Old data is cleaned up after no
jobs
are using it.

This strategy works fine when updating an entire top-level parquet
database. However, it seems like Spark SQL (or parquet) cannot handle
partition
directories being symlinks (and even if it could, it probably wouldn't
resolve
those symlinks so that it doesn't blow up when the symlink changes at
runtime). For example, if you create symlinks a=1, a=2 and a=3 in a
directory
and then try to load that directory in Spark SQL, you get the "Conflicting
partition column names detected".

So my question is, can anyone think of another solution that meets my
requirements (i.e. to take advantage of paritioning and perform safe
updates of
existing partitions)?

Thanks!

- Philip


Re: Support for time column type?

2016-04-04 Thread Philip Weaver
Hmm, yeah it looks like I could use that to represent time since start of
day. I'm porting existing large SQL queries from Postgres to Spark SQL for
a quickPOC, so I'd prefer not to have to make many changes to it. I'm not
sure if the CalendarIntervalType can be used as a drop-in replacement (i.e.
if all the same operators are defined for it), but I'll give it a try.

- Philip


On Fri, Apr 1, 2016 at 1:33 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> There is also CalendarIntervalType.  Is that what you are looking for?
>
> On Fri, Apr 1, 2016 at 1:11 PM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
>> Hi, I don't see any mention of a time type in the documentation (there is
>> DateType and TimestampType, but not TimeType), and have been unable to find
>> any documentation about whether this will be supported in the future. Does
>> anyone know if this is currently supported or will be supported in the
>> future?
>>
>
>


Support for time column type?

2016-04-01 Thread Philip Weaver
Hi, I don't see any mention of a time type in the documentation (there is
DateType and TimestampType, but not TimeType), and have been unable to find
any documentation about whether this will be supported in the future. Does
anyone know if this is currently supported or will be supported in the
future?


Re: Location preferences in pyspark?

2015-10-20 Thread Philip Weaver
Thanks, we decided to try to add the support ourselves :).

On Tue, Oct 20, 2015 at 6:40 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> Yes, I don't think there is.  You can use SparkContext.parallelize() to
> make a RDD from a list. But no location preferences supported yet.
>
> On Sat, Oct 17, 2015 at 8:42 AM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
>> I believe what I want is the exact functionality provided by
>> SparkContext.makeRDD in Scala. For each element in the RDD, I want specify
>> a list of preferred hosts for processing that element.
>>
>> It looks like this method only exists in Scala, and as far as I can tell
>> there is no similar functionality available in python. Is this true?
>>
>> - Philip
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Location preferences in pyspark?

2015-10-16 Thread Philip Weaver
I believe what I want is the exact functionality provided by
SparkContext.makeRDD in Scala. For each element in the RDD, I want specify
a list of preferred hosts for processing that element.

It looks like this method only exists in Scala, and as far as I can tell
there is no similar functionality available in python. Is this true?

- Philip


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
Yes, I am sharing the cluster across many jobs, and each jobs only needs 8
cores (in fact, because the jobs are so small and are counting uniques, it
only gets slower as you add more cores). My question is how to limit each
job to only use 8 cores, but have the entire cluster available for that
SparkContext; e.g. if I have a cluster of 128 cores, and I want to limit
the SparkCOntext to 64 cores, and each job to 8 cores, so I can run up to 8
jobs at once.

On Sun, Oct 4, 2015 at 9:38 AM, Adrian Tanase <atan...@adobe.com> wrote:

> You are absolutely correct, I apologize.
>
> My understanding was that you are sharing the machine across many jobs.
> That was the context in which I was making that comment.
>
> -adrian
>
> Sent from my iPhone
>
> On 03 Oct 2015, at 07:03, Philip Weaver <philip.wea...@gmail.com> wrote:
>
> You can't really say 8 cores is not much horsepower when you have no idea
> what my use case is. That's silly.
>
> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> Forgot to mention that you could also restrict the parallelism to 4,
>> essentially using only 4 cores at any given time, however if your job is
>> complex, a stage might be broken into more than 1 task...
>>
>> Sent from my iPhone
>>
>> On 19 Sep 2015, at 08:30, Adrian Tanase <atan...@adobe.com> wrote:
>>
>> Reading through the docs it seems that with a combination of FAIR
>> scheduler and maybe pools you can get pretty far.
>>
>> However the smallest unit of scheduled work is the task so probably you
>> need to think about the parallelism of each transformation.
>>
>> I'm guessing that by increasing the level of parallelism you get many
>> smaller tasks that the scheduler can then run across the many jobs you
>> might have - as opposed to fewer, longer tasks...
>>
>> Lastly, 8 cores is not that much horsepower :)
>> You may consider running with beefier machines or a larger cluster, to
>> get at least tens of cores.
>>
>> Hope this helps,
>> -adrian
>>
>> Sent from my iPhone
>>
>> On 18 Sep 2015, at 18:37, Philip Weaver <philip.wea...@gmail.com> wrote:
>>
>> Here's a specific example of what I want to do. My Spark application is
>> running with total-executor-cores=8. A request comes in, it spawns a thread
>> to handle that request, and starts a job. That job should use only 4 cores,
>> not all 8 of the cores available to the cluster.. When the first job is
>> scheduled, it should take only 4 cores, not all 8 of the cores that are
>> available to the driver.
>>
>> Is there any way to accomplish this? This is on mesos.
>>
>> In order to support the use cases described in
>> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
>> application runs for a long time and handles requests from multiple users,
>> I believe what I'm asking about is a very important feature. One of the
>> goals is to get lower latency for each request, but if the first request
>> takes all resources and we can't guarantee any free resources for the
>> second request, then that defeats the purpose. Does that make sense?
>>
>> Thanks in advance for any advice you can provide!
>>
>> - Philip
>>
>> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver <philip.wea...@gmail.com>
>> wrote:
>>
>>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>>> scheduler, so I can define a long-running application capable of executing
>>> multiple simultaneous spark jobs.
>>>
>>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>>> but I want my application to be able to take several times that in order to
>>> run multiple jobs at the same time.
>>>
>>> I suppose my question is more basic: How can I limit the number of cores
>>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>>> Spark from using more cores to load it.
>>>
>>> Does it make sense what I am trying to accomplish, and is there any way
>>> to do it?
>>>
>>> - Philip
>>>
>>>
>>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
I believe I've described my use case clearly, and I'm being questioned that
it's legitimate. I will assert again that if you don't understand my use
case, it really doesn't make sense to make any statement about how many
resources I should need.

And I'm sorry, but I completely disagree with your logic. Your suggestion
is not simpler. The development effort that Spark saves is what you would
have to do to parallelize an algorithm from single-threaded to 4 cores. So
the big win comes from getting to 4 cores, not taking it from 4 to 128
(though that also is nice). Everything that Spark does I could do myself,
but it would take much longer. Keep in mind I'm not just trying to reduce
the level of effort for scheduling jobs, but also scheduling the tasks
within each job, and those are both something that Spark does really well.

- Philip


On Sun, Oct 4, 2015 at 10:57 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Philip, the guy is trying to help you. Calling him silly is a bit too far.
> He might assume your problem is IO bound which might not be the case. If
> you need only 4 cores per job no matter what there is little advantage to
> use spark in my opinion because you can easily do this with just a worker
> farm that take the job and process it in a single machine. let the
> scheduler figures out which node in the farm is idled and spawns jobs on
> those until all of them are saturated. Call me silly but this seems much
> simpler.
>
> Sent from my iPhone
>
> On 3 Oct, 2015, at 12:02 am, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
> You can't really say 8 cores is not much horsepower when you have no idea
> what my use case is. That's silly.
>
> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase <atan...@adobe.com> wrote:
>
>> Forgot to mention that you could also restrict the parallelism to 4,
>> essentially using only 4 cores at any given time, however if your job is
>> complex, a stage might be broken into more than 1 task...
>>
>> Sent from my iPhone
>>
>> On 19 Sep 2015, at 08:30, Adrian Tanase <atan...@adobe.com> wrote:
>>
>> Reading through the docs it seems that with a combination of FAIR
>> scheduler and maybe pools you can get pretty far.
>>
>> However the smallest unit of scheduled work is the task so probably you
>> need to think about the parallelism of each transformation.
>>
>> I'm guessing that by increasing the level of parallelism you get many
>> smaller tasks that the scheduler can then run across the many jobs you
>> might have - as opposed to fewer, longer tasks...
>>
>> Lastly, 8 cores is not that much horsepower :)
>> You may consider running with beefier machines or a larger cluster, to
>> get at least tens of cores.
>>
>> Hope this helps,
>> -adrian
>>
>> Sent from my iPhone
>>
>> On 18 Sep 2015, at 18:37, Philip Weaver <philip.wea...@gmail.com> wrote:
>>
>> Here's a specific example of what I want to do. My Spark application is
>> running with total-executor-cores=8. A request comes in, it spawns a thread
>> to handle that request, and starts a job. That job should use only 4 cores,
>> not all 8 of the cores available to the cluster.. When the first job is
>> scheduled, it should take only 4 cores, not all 8 of the cores that are
>> available to the driver.
>>
>> Is there any way to accomplish this? This is on mesos.
>>
>> In order to support the use cases described in
>> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
>> application runs for a long time and handles requests from multiple users,
>> I believe what I'm asking about is a very important feature. One of the
>> goals is to get lower latency for each request, but if the first request
>> takes all resources and we can't guarantee any free resources for the
>> second request, then that defeats the purpose. Does that make sense?
>>
>> Thanks in advance for any advice you can provide!
>>
>> - Philip
>>
>> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver <philip.wea...@gmail.com>
>> wrote:
>>
>>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>>> scheduler, so I can define a long-running application capable of executing
>>> multiple simultaneous spark jobs.
>>>
>>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>>> but I want my application to be able to take several times that in order to
>>> run multiple jobs at the same time.
>>>
>>> I suppose my question is more basic: How can I limit the number of cores
>>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>>> Spark from using more cores to load it.
>>>
>>> Does it make sense what I am trying to accomplish, and is there any way
>>> to do it?
>>>
>>> - Philip
>>>
>>>
>>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
Since I'm running Spark on Mesos, to be fair I should give Mesos credit,
too! And I should also put some effort into describing what I'm trying to
accomplish of more clearly. There are really three levels of scheduling
that I'm hoping to exploit:

- Scheduling in Mesos across all frameworks, where the particular type of
Spark job that I previously described is only one of many types of
frameworks.
- Scheduling of multiple jobs (maybe that's not the right terminology?)
within the same SparkContext; this SparkContext would run in a persistent
application with an API for users to submit jobs.
- Scheduling of individual tasks with a single user submitted Spark job.

Through some brief testing, I've found that the performance of my jobs
scales almost linearly until about 8 cores, and after that the gains are
very small or sometimes even negative. These are small jobs, that typically
count uniques across only about 40M rows.

This setup works very well, with one exception: when a user submits a job,
if there are no others running, then that job will take all of the cores
that the SparkContext has available to it. This is undesirable for two
reasons:
1.) As I mentioned above, the jobs don't scale beyond about 8 cores.
2.) The next submitted job will have to wait for resources to become
available.

- Philip


On Sun, Oct 4, 2015 at 2:33 PM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> I believe I've described my use case clearly, and I'm being questioned
> that it's legitimate. I will assert again that if you don't understand my
> use case, it really doesn't make sense to make any statement about how many
> resources I should need.
>
> And I'm sorry, but I completely disagree with your logic. Your suggestion
> is not simpler. The development effort that Spark saves is what you would
> have to do to parallelize an algorithm from single-threaded to 4 cores. So
> the big win comes from getting to 4 cores, not taking it from 4 to 128
> (though that also is nice). Everything that Spark does I could do myself,
> but it would take much longer. Keep in mind I'm not just trying to reduce
> the level of effort for scheduling jobs, but also scheduling the tasks
> within each job, and those are both something that Spark does really well.
>
> - Philip
>
>
> On Sun, Oct 4, 2015 at 10:57 AM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Philip, the guy is trying to help you. Calling him silly is a bit too
>> far. He might assume your problem is IO bound which might not be the case.
>> If you need only 4 cores per job no matter what there is little advantage
>> to use spark in my opinion because you can easily do this with just a
>> worker farm that take the job and process it in a single machine. let the
>> scheduler figures out which node in the farm is idled and spawns jobs on
>> those until all of them are saturated. Call me silly but this seems much
>> simpler.
>>
>> Sent from my iPhone
>>
>> On 3 Oct, 2015, at 12:02 am, Philip Weaver <philip.wea...@gmail.com>
>> wrote:
>>
>> You can't really say 8 cores is not much horsepower when you have no idea
>> what my use case is. That's silly.
>>
>> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase <atan...@adobe.com>
>> wrote:
>>
>>> Forgot to mention that you could also restrict the parallelism to 4,
>>> essentially using only 4 cores at any given time, however if your job is
>>> complex, a stage might be broken into more than 1 task...
>>>
>>> Sent from my iPhone
>>>
>>> On 19 Sep 2015, at 08:30, Adrian Tanase <atan...@adobe.com> wrote:
>>>
>>> Reading through the docs it seems that with a combination of FAIR
>>> scheduler and maybe pools you can get pretty far.
>>>
>>> However the smallest unit of scheduled work is the task so probably you
>>> need to think about the parallelism of each transformation.
>>>
>>> I'm guessing that by increasing the level of parallelism you get many
>>> smaller tasks that the scheduler can then run across the many jobs you
>>> might have - as opposed to fewer, longer tasks...
>>>
>>> Lastly, 8 cores is not that much horsepower :)
>>> You may consider running with beefier machines or a larger cluster, to
>>> get at least tens of cores.
>>>
>>> Hope this helps,
>>> -adrian
>>>
>>> Sent from my iPhone
>>>
>>> On 18 Sep 2015, at 18:37, Philip Weaver <philip.wea...@gmail.com> wrote:
>>>
>>> Here's a specific example of what I want to do. My Spark application is
>>> running with total-executor-cores=8. A request comes in, it spawns a thread
>>> to handle th

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-02 Thread Philip Weaver
You can't really say 8 cores is not much horsepower when you have no idea
what my use case is. That's silly.

On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase <atan...@adobe.com> wrote:

> Forgot to mention that you could also restrict the parallelism to 4,
> essentially using only 4 cores at any given time, however if your job is
> complex, a stage might be broken into more than 1 task...
>
> Sent from my iPhone
>
> On 19 Sep 2015, at 08:30, Adrian Tanase <atan...@adobe.com> wrote:
>
> Reading through the docs it seems that with a combination of FAIR
> scheduler and maybe pools you can get pretty far.
>
> However the smallest unit of scheduled work is the task so probably you
> need to think about the parallelism of each transformation.
>
> I'm guessing that by increasing the level of parallelism you get many
> smaller tasks that the scheduler can then run across the many jobs you
> might have - as opposed to fewer, longer tasks...
>
> Lastly, 8 cores is not that much horsepower :)
> You may consider running with beefier machines or a larger cluster, to get
> at least tens of cores.
>
> Hope this helps,
> -adrian
>
> Sent from my iPhone
>
> On 18 Sep 2015, at 18:37, Philip Weaver <philip.wea...@gmail.com> wrote:
>
> Here's a specific example of what I want to do. My Spark application is
> running with total-executor-cores=8. A request comes in, it spawns a thread
> to handle that request, and starts a job. That job should use only 4 cores,
> not all 8 of the cores available to the cluster.. When the first job is
> scheduled, it should take only 4 cores, not all 8 of the cores that are
> available to the driver.
>
> Is there any way to accomplish this? This is on mesos.
>
> In order to support the use cases described in
> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
> application runs for a long time and handles requests from multiple users,
> I believe what I'm asking about is a very important feature. One of the
> goals is to get lower latency for each request, but if the first request
> takes all resources and we can't guarantee any free resources for the
> second request, then that defeats the purpose. Does that make sense?
>
> Thanks in advance for any advice you can provide!
>
> - Philip
>
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>> scheduler, so I can define a long-running application capable of executing
>> multiple simultaneous spark jobs.
>>
>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>> but I want my application to be able to take several times that in order to
>> run multiple jobs at the same time.
>>
>> I suppose my question is more basic: How can I limit the number of cores
>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>> Spark from using more cores to load it.
>>
>> Does it make sense what I am trying to accomplish, and is there any way
>> to do it?
>>
>> - Philip
>>
>>
>


Metadata in Parquet

2015-09-30 Thread Philip Weaver
Hi, I am using org.apache.spark.sql.types.Metadata to store extra
information along with each of my fields. I'd also like to store Metadata
for the entire DataFrame, not attached to any specific field. Is this
supported?

- Philip


Re: Remove duplicate keys by always choosing first in file.

2015-09-24 Thread Philip Weaver
Oops, I didn't catch the suggestion to just use RDD.zipWithIndex, which I
forgot existed (and I've discoverd I actually used in another project!). I
will use that instead of the mapPartitionsWithIndex/zipWithIndex solution
that I posted originally.

On Tue, Sep 22, 2015 at 9:07 AM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> The indices are definitely necessary. My first solution was just
> reduceByKey { case (v, _) => v } and that didn't work. I needed to look at
> both values and see which had the lower index.
>
> On Tue, Sep 22, 2015 at 8:54 AM, Sean Owen <so...@cloudera.com> wrote:
>
>> The point is that this only works if you already knew the file was
>> presented in order within and across partitions, which was the
>> original problem anyway. I don't think it is in general, but in
>> practice, I do imagine it's already in the expected order from
>> textFile. Maybe under the hood this ends up being ensured by
>> TextInputFormat.
>>
>> So, adding the index and sorting on it doesn't add anything.
>>
>> On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase <atan...@adobe.com> wrote:
>> > just give zipWithIndex a shot, use it early in the pipeline. I think it
>> > provides exactly the info you need, as the index is the original line
>> number
>> > in the file, not the index in the partition.
>> >
>> > Sent from my iPhone
>> >
>> > On 22 Sep 2015, at 17:50, Philip Weaver <philip.wea...@gmail.com>
>> wrote:
>> >
>> > Thanks. If textFile can be used in a way that preserves order, than
>> both the
>> > partition index and the index within each partition should be
>> consistent,
>> > right?
>> >
>> > I overcomplicated the question by asking about removing duplicates.
>> > Fundamentally I think my question is, how does one sort lines in a file
>> by
>> > line number.
>> >
>> > On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase <atan...@adobe.com>
>> wrote:
>> >>
>> >> By looking through the docs and source code, I think you can get away
>> with
>> >> rdd.zipWithIndex to get the index of each line in the file, as long as
>> you
>> >> define the parallelism upfront:
>> >> sc.textFile("README.md", 4)
>> >>
>> >> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
>> >> skimming through some tuples, hopefully this is clear enough.
>> >>
>> >> -adrian
>> >>
>> >> From: Philip Weaver
>> >> Date: Tuesday, September 22, 2015 at 3:26 AM
>> >> To: user
>> >> Subject: Remove duplicate keys by always choosing first in file.
>> >>
>> >> I am processing a single file and want to remove duplicate rows by some
>> >> key by always choosing the first row in the file for that key.
>> >>
>> >> The best solution I could come up with is to zip each row with the
>> >> partition index and local index, like this:
>> >>
>> >> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>> >>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> >> ((partitionIndex, localIndex), row)) }
>> >> }
>> >>
>> >>
>> >> And then using reduceByKey with a min ordering on the (partitionIndex,
>> >> localIndex) pair.
>> >>
>> >> First, can i count on SparkContext.textFile to read the lines in such
>> that
>> >> the partition indexes are always increasing so that the above works?
>> >>
>> >> And, is there a better way to accomplish the same effect?
>> >>
>> >> Thanks!
>> >>
>> >> - Philip
>> >>
>> >
>>
>
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
The indices are definitely necessary. My first solution was just
reduceByKey { case (v, _) => v } and that didn't work. I needed to look at
both values and see which had the lower index.

On Tue, Sep 22, 2015 at 8:54 AM, Sean Owen <so...@cloudera.com> wrote:

> The point is that this only works if you already knew the file was
> presented in order within and across partitions, which was the
> original problem anyway. I don't think it is in general, but in
> practice, I do imagine it's already in the expected order from
> textFile. Maybe under the hood this ends up being ensured by
> TextInputFormat.
>
> So, adding the index and sorting on it doesn't add anything.
>
> On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase <atan...@adobe.com> wrote:
> > just give zipWithIndex a shot, use it early in the pipeline. I think it
> > provides exactly the info you need, as the index is the original line
> number
> > in the file, not the index in the partition.
> >
> > Sent from my iPhone
> >
> > On 22 Sep 2015, at 17:50, Philip Weaver <philip.wea...@gmail.com> wrote:
> >
> > Thanks. If textFile can be used in a way that preserves order, than both
> the
> > partition index and the index within each partition should be consistent,
> > right?
> >
> > I overcomplicated the question by asking about removing duplicates.
> > Fundamentally I think my question is, how does one sort lines in a file
> by
> > line number.
> >
> > On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase <atan...@adobe.com>
> wrote:
> >>
> >> By looking through the docs and source code, I think you can get away
> with
> >> rdd.zipWithIndex to get the index of each line in the file, as long as
> you
> >> define the parallelism upfront:
> >> sc.textFile("README.md", 4)
> >>
> >> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
> >> skimming through some tuples, hopefully this is clear enough.
> >>
> >> -adrian
> >>
> >> From: Philip Weaver
> >> Date: Tuesday, September 22, 2015 at 3:26 AM
> >> To: user
> >> Subject: Remove duplicate keys by always choosing first in file.
> >>
> >> I am processing a single file and want to remove duplicate rows by some
> >> key by always choosing the first row in the file for that key.
> >>
> >> The best solution I could come up with is to zip each row with the
> >> partition index and local index, like this:
> >>
> >> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> >> ((partitionIndex, localIndex), row)) }
> >> }
> >>
> >>
> >> And then using reduceByKey with a min ordering on the (partitionIndex,
> >> localIndex) pair.
> >>
> >> First, can i count on SparkContext.textFile to read the lines in such
> that
> >> the partition indexes are always increasing so that the above works?
> >>
> >> And, is there a better way to accomplish the same effect?
> >>
> >> Thanks!
> >>
> >> - Philip
> >>
> >
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
Thanks. If textFile can be used in a way that preserves order, than both
the partition index and the index within each partition should be
consistent, right?

I overcomplicated the question by asking about removing duplicates.
Fundamentally I think my question is, how does one sort lines in a file by
line number.

On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase <atan...@adobe.com> wrote:

> By looking through the docs and source code, I think you can get away with
> rdd.zipWithIndex to get the index of each line in the file, as long as
> you define the parallelism upfront:
> sc.textFile("README.md", 4)
>
> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
> skimming through some tuples, hopefully this is clear enough.
>
> -adrian
>
> From: Philip Weaver
> Date: Tuesday, September 22, 2015 at 3:26 AM
> To: user
> Subject: Remove duplicate keys by always choosing first in file.
>
> I am processing a single file and want to remove duplicate rows by some
> key by always choosing the first row in the file for that key.
>
> The best solution I could come up with is to zip each row with the
> partition index and local index, like this:
>
> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> ((partitionIndex, localIndex), row)) }
> }
>
>
> And then using reduceByKey with a min ordering on the (partitionIndex,
> localIndex) pair.
>
> First, can i count on SparkContext.textFile to read the lines in such that
> the partition indexes are always increasing so that the above works?
>
> And, is there a better way to accomplish the same effect?
>
> Thanks!
>
> - Philip
>
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
Hmm, I don't think that's what I want. There's no "zero value" in my use
case.

On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen <so...@cloudera.com> wrote:

> I think foldByKey is much more what you want, as it has more a notion
> of building up some result per key by encountering values serially.
> You would take the first and ignore the rest. Note that "first"
> depends on your RDD having an ordering to begin with, or else you rely
> on however it happens to be ordered after whatever operations give you
> a key-value RDD.
>
> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
> > I am processing a single file and want to remove duplicate rows by some
> key
> > by always choosing the first row in the file for that key.
> >
> > The best solution I could come up with is to zip each row with the
> partition
> > index and local index, like this:
> >
> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> > ((partitionIndex, localIndex), row)) }
> > }
> >
> >
> > And then using reduceByKey with a min ordering on the (partitionIndex,
> > localIndex) pair.
> >
> > First, can i count on SparkContext.textFile to read the lines in such
> that
> > the partition indexes are always increasing so that the above works?
> >
> > And, is there a better way to accomplish the same effect?
> >
> > Thanks!
> >
> > - Philip
> >
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
Hmm, ok, but I'm not seeing why foldByKey is more appropriate than
reduceByKey? Specifically, is foldByKey guaranteed to walk the RDD in
order, but reduceByKey is not?

On Mon, Sep 21, 2015 at 8:41 PM, Sean Owen <so...@cloudera.com> wrote:

> The zero value here is None. Combining None with any row should yield
> Some(row). After that, combining is a no-op for other rows.
>
> On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
> > Hmm, I don't think that's what I want. There's no "zero value" in my use
> > case.
> >
> > On Mon, Sep 21, 2015 at 8:20 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> I think foldByKey is much more what you want, as it has more a notion
> >> of building up some result per key by encountering values serially.
> >> You would take the first and ignore the rest. Note that "first"
> >> depends on your RDD having an ordering to begin with, or else you rely
> >> on however it happens to be ordered after whatever operations give you
> >> a key-value RDD.
> >>
> >> On Tue, Sep 22, 2015 at 1:26 AM, Philip Weaver <philip.wea...@gmail.com
> >
> >> wrote:
> >> > I am processing a single file and want to remove duplicate rows by
> some
> >> > key
> >> > by always choosing the first row in the file for that key.
> >> >
> >> > The best solution I could come up with is to zip each row with the
> >> > partition
> >> > index and local index, like this:
> >> >
> >> > rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >> >   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> >> > ((partitionIndex, localIndex), row)) }
> >> > }
> >> >
> >> >
> >> > And then using reduceByKey with a min ordering on the (partitionIndex,
> >> > localIndex) pair.
> >> >
> >> > First, can i count on SparkContext.textFile to read the lines in such
> >> > that
> >> > the partition indexes are always increasing so that the above works?
> >> >
> >> > And, is there a better way to accomplish the same effect?
> >> >
> >> > Thanks!
> >> >
> >> > - Philip
> >> >
> >
> >
>


Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
I am processing a single file and want to remove duplicate rows by some key
by always choosing the first row in the file for that key.

The best solution I could come up with is to zip each row with the
partition index and local index, like this:

rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
  rows.zipWithIndex.map { case (row, localIndex) => (row.key,
((partitionIndex, localIndex), row)) }
}


And then using reduceByKey with a min ordering on the (partitionIndex,
localIndex) pair.

First, can i count on SparkContext.textFile to read the lines in such that
the partition indexes are always increasing so that the above works?

And, is there a better way to accomplish the same effect?

Thanks!

- Philip


Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
Here's a specific example of what I want to do. My Spark application is
running with total-executor-cores=8. A request comes in, it spawns a thread
to handle that request, and starts a job. That job should use only 4 cores,
not all 8 of the cores available to the cluster.. When the first job is
scheduled, it should take only 4 cores, not all 8 of the cores that are
available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
application runs for a long time and handles requests from multiple users,
I believe what I'm asking about is a very important feature. One of the
goals is to get lower latency for each request, but if the first request
takes all resources and we can't guarantee any free resources for the
second request, then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
> scheduler, so I can define a long-running application capable of executing
> multiple simultaneous spark jobs.
>
> The kind of jobs that I'm running do not benefit from more than 4 cores,
> but I want my application to be able to take several times that in order to
> run multiple jobs at the same time.
>
> I suppose my question is more basic: How can I limit the number of cores
> used to load an RDD or DataFrame? I can immediately repartition or coalesce
> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
> Spark from using more cores to load it.
>
> Does it make sense what I am trying to accomplish, and is there any way to
> do it?
>
> - Philip
>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
(whoops, redundant sentence in that first paragraph)

On Fri, Sep 18, 2015 at 8:36 AM, Philip Weaver <philip.wea...@gmail.com>
wrote:

> Here's a specific example of what I want to do. My Spark application is
> running with total-executor-cores=8. A request comes in, it spawns a thread
> to handle that request, and starts a job. That job should use only 4 cores,
> not all 8 of the cores available to the cluster.. When the first job is
> scheduled, it should take only 4 cores, not all 8 of the cores that are
> available to the driver.
>
> Is there any way to accomplish this? This is on mesos.
>
> In order to support the use cases described in
> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
> application runs for a long time and handles requests from multiple users,
> I believe what I'm asking about is a very important feature. One of the
> goals is to get lower latency for each request, but if the first request
> takes all resources and we can't guarantee any free resources for the
> second request, then that defeats the purpose. Does that make sense?
>
> Thanks in advance for any advice you can provide!
>
> - Philip
>
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>> scheduler, so I can define a long-running application capable of executing
>> multiple simultaneous spark jobs.
>>
>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>> but I want my application to be able to take several times that in order to
>> run multiple jobs at the same time.
>>
>> I suppose my question is more basic: How can I limit the number of cores
>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>> Spark from using more cores to load it.
>>
>> Does it make sense what I am trying to accomplish, and is there any way
>> to do it?
>>
>> - Philip
>>
>>
>


Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
Hello, I am trying to use dynamic allocation which requires the shuffle
service. I am running Spark on mesos.

Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails
with an error like this:

Caused by: java.net.ConnectException: Connection refused: devspark1/
172.26.21.70:7337

It's not clear from the documentation if the shuffle service starts
automatically just by having it enabled, or if I need to do something else.
There are instructions for running the shuffle service in YARN, but not
mesos.

- Philip


Re: Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
Ah, I missed that, thanks!

On Mon, Sep 14, 2015 at 11:45 AM, Tim Chen <t...@mesosphere.io> wrote:

> Hi Philip,
>
> I've included documentation in the Spark/Mesos doc (
> http://spark.apache.org/docs/latest/running-on-mesos.html), where you can
> start the MesosShuffleService with sbin/start-mesos-shuffle-service.sh
> script.
>
> The shuffle service needs to be started manually for Mesos on each slave
> (one way is via Marathon with unique hostname constraint), and then you
> need to enable dynamicAllocation and shuffle service flag on the driver and
> it should work.
>
> Let me know if that's not clear.
>
> Tim
>
> On Mon, Sep 14, 2015 at 11:36 AM, Philip Weaver <philip.wea...@gmail.com>
> wrote:
>
>> Hello, I am trying to use dynamic allocation which requires the shuffle
>> service. I am running Spark on mesos.
>>
>> Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails
>> with an error like this:
>>
>> Caused by: java.net.ConnectException: Connection refused: devspark1/
>> 172.26.21.70:7337
>>
>> It's not clear from the documentation if the shuffle service starts
>> automatically just by having it enabled, or if I need to do something else.
>> There are instructions for running the shuffle service in YARN, but not
>> mesos.
>>
>> - Philip
>>
>>
>


Limiting number of cores per job in multi-threaded driver.

2015-09-12 Thread Philip Weaver
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
scheduler, so I can define a long-running application capable of executing
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores,
but I want my application to be able to take several times that in order to
run multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores
used to load an RDD or DataFrame? I can immediately repartition or coalesce
my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
Spark from using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to
do it?

- Philip


Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Philip Weaver
1 minute to discover 1000s of partitions -- yes, that is what I have
observed. And I would assert that is very slow.

On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust mich...@databricks.com
wrote:

 We should not be actually scanning all of the data of all of the
 partitions, but we do need to at least list all of the available
 directories so that we can apply your predicates to the actual values that
 are present when we are deciding which files need to be read in a given
 spark job.  While this is a somewhat expensive operation, we do it in
 parallel and we cache this information when you access the same relation
 more than once.

 Can you provide a little more detail about how exactly you are accessing
 the parquet data (are you using sqlContext.read or creating persistent
 tables in the metastore?), and how long it is taking?  It would also be
 good to know how many partitions we are talking about and how much data is
 in each.  Finally, I'd like to see the stacktrace where it is hanging to
 make sure my above assertions are correct.

 We have several tables internally that have 1000s of partitions and while
 it takes ~1 minute initially to discover the metadata, after that we are
 able to query the data interactively.



 On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 anybody has any suggestions?

 On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 Is there a workaround without updating Hadoop? Would really appreciate
 if someone can explain what spark is trying to do here and what is an easy
 way to turn this off. Thanks all!

 On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...
 On Aug 21, 2015 11:24 PM, Jerrick Hoang jerrickho...@gmail.com
 wrote:

 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819'
 and hour='00';

 TungstenAggregate(key=[],
 value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ .. about 1000 partition paths go here ]

 Why does spark have to scan all partitions when the query only
 concerns with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled
 before, and I couldn't find much information about it online. What does 
 it
 mean exactly to disable it? Are there any negative consequences to
 disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Can you make some more profiling? I am wondering if the driver is
 busy with scanning the HDFS / S3.

 Like jstack pid of driver process



 And also, it’s will be great if you can paste the physical plan for
 the simple query.



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 1:46 PM
 *To:* Cheng, Hao
 *Cc:* Philip Weaver; user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot
 of partitions



 I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
 of CLs trying to speed up spark sql with tables with a huge number of
 partitions, I've made sure that those CLs are included but it's still 
 very
 slow



 On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao hao.ch...@intel.com
 wrote:

 Yes, you can try set the
 spark.sql.sources.partitionDiscovery.enabled to false.



 BTW, which version are you using?



 Hao



 *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
 *Sent:* Thursday, August 20, 2015 12:16 PM
 *To:* Philip Weaver
 *Cc:* user
 *Subject:* Re: Spark Sql behaves strangely with tables with a lot
 of partitions



 I guess the question is why does spark have to do partition
 discovery with all partitions when the query only needs to look at one
 partition? Is there a conf flag to turn this off?



 On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver 
 philip.wea...@gmail.com wrote:

 I've had the same problem. It turns out that Spark (specifically
 parquet) is very slow at partition discovery. It got better in 1.5 (not 
 yet
 released), but was still unacceptably slow. Sadly, we ended up reading
 parquet files manually in Python (via C++) and had to abandon Spark SQL
 because of this problem.



 On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
 jerrickho...@gmail.com wrote:

 Hi all,



 I did a simple experiment with Spark SQL. I created a partitioned
 parquet table with only one partition (date=20140701). A simple `select
 count(*) from table where date=20140701` would run very fast (0.1 
 seconds).
 However, as I

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Philip Weaver
I've had the same problem. It turns out that Spark (specifically parquet)
is very slow at partition discovery. It got better in 1.5 (not yet
released), but was still unacceptably slow. Sadly, we ended up reading
parquet files manually in Python (via C++) and had to abandon Spark SQL
because of this problem.

On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 Hi all,

 I did a simple experiment with Spark SQL. I created a partitioned parquet
 table with only one partition (date=20140701). A simple `select count(*)
 from table where date=20140701` would run very fast (0.1 seconds). However,
 as I added more partitions the query takes longer and longer. When I added
 about 10,000 partitions, the query took way too long. I feel like querying
 for a single partition should not be affected by having more partitions. Is
 this a known behaviour? What does spark try to do here?

 Thanks,
 Jerrick



Re: Driver staggering task launch times

2015-08-14 Thread Philip Weaver
Ah, nevermind, I don't know anything about scheduling tasks in YARN.

On Thu, Aug 13, 2015 at 11:03 PM, Ara Vartanian arav...@cs.wisc.edu wrote:

 I’m running on Yarn.

 On Aug 13, 2015, at 10:58 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 Are you running on mesos, yarn or standalone? If you're on mesos, are you
 using coarse grain or fine grained mode?

 On Thu, Aug 13, 2015 at 10:13 PM, Ara Vartanian arav...@cs.wisc.edu
 wrote:

 I’m observing an unusual situation where my step duration increases as I
 add further executors to my cluster. My algorithm is fully data
 parallelizable into a map phase, followed by a reduce step at the end that
 amounts to matrix addition. So I’ve kicked a cluster of, say, 100 executors
 with 4 cores per executor and before running the algorithm I’ve
 repartitioned the RDD into 400 partitions. I can see in the Spark UI that
 each of the 400 (map) tasks takes about 2 seconds. However, the entire step
 is taking over a minute, and this is because the launch times of the tasks
 as reported in the Spark UI are staggered. For example, the first 100 might
 be launched in the same second, then another group 3 seconds later, and so
 forth (with the durations slowly expanding). With a task time of 2 seconds,
 this “launch lag” is dominating the computation time and only gets worse as
 I add nodes.

 Any insight on how I could go about diagnosing this would be greatly
 appreciated.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Write to cassandra...each individual statement

2015-08-13 Thread Philip Weaver
All you'd need to do is *transform* the rdd before writing it, e.g. using
the .map function.


On Thu, Aug 13, 2015 at 11:30 AM, Priya Ch learnings.chitt...@gmail.com
wrote:

 Hi All,

  I have a question in writing rdd to cassandra. Instead of writing entire
 rdd to cassandra, i want to write individual statement into cassandra
 beacuse there is a need to perform to ETL on each message ( which requires
 checking with the DB).
 How could i insert statements individually? Using
 CassandraConnector.session ??

 If so, what is the performance impact of this ? How about using
 sc.parallelize() for eah message in the rdd and then insert into cassandra ?

 Thanks,
 Padma Ch



Re: Write to cassandra...each individual statement

2015-08-13 Thread Philip Weaver
So you need some state between messages in a partition. You can use
mapPartitions or foreachPartition, which allow you to write code to process
an entire partition.

On Thu, Aug 13, 2015 at 11:48 AM, Priya Ch learnings.chitt...@gmail.com
wrote:

 Hi Philip,

  I have the following requirement -
 I read the streams of data from various partitions of kafka topic. And
 then I union the dstreams and apply hash partitioner so messages of same
 key would go into single partition of an rdd, which is ofcourse handled by
 a single thread. This way we trying to resolve concurrency issue.

 Now one of the partitions of the rdd holds messages with same key. Let's
 say 1st message in the partition may correspond to ticket issuance and 2nd
 message might corresponds to update on the ticket. Now while handling 1st
 message there is different logic and 2nd message's logic depends on 1st
 message.
 Hence using rdd.foreach i am handling different logic for individual
 messages. Now bulk rdd.saveToCassandra will now work.

 Hope you got what i am trying to say..

 On Fri, Aug 14, 2015 at 12:07 AM, Philip Weaver philip.wea...@gmail.com
 wrote:

 All you'd need to do is *transform* the rdd before writing it, e.g.
 using the .map function.


 On Thu, Aug 13, 2015 at 11:30 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:

 Hi All,

  I have a question in writing rdd to cassandra. Instead of writing
 entire rdd to cassandra, i want to write individual statement into
 cassandra beacuse there is a need to perform to ETL on each message ( which
 requires checking with the DB).
 How could i insert statements individually? Using
 CassandraConnector.session ??

 If so, what is the performance impact of this ? How about using
 sc.parallelize() for eah message in the rdd and then insert into cassandra ?

 Thanks,
 Padma Ch






Re: Driver staggering task launch times

2015-08-13 Thread Philip Weaver
Are you running on mesos, yarn or standalone? If you're on mesos, are you
using coarse grain or fine grained mode?

On Thu, Aug 13, 2015 at 10:13 PM, Ara Vartanian arav...@cs.wisc.edu wrote:

 I’m observing an unusual situation where my step duration increases as I
 add further executors to my cluster. My algorithm is fully data
 parallelizable into a map phase, followed by a reduce step at the end that
 amounts to matrix addition. So I’ve kicked a cluster of, say, 100 executors
 with 4 cores per executor and before running the algorithm I’ve
 repartitioned the RDD into 400 partitions. I can see in the Spark UI that
 each of the 400 (map) tasks takes about 2 seconds. However, the entire step
 is taking over a minute, and this is because the launch times of the tasks
 as reported in the Spark UI are staggered. For example, the first 100 might
 be launched in the same second, then another group 3 seconds later, and so
 forth (with the durations slowly expanding). With a task time of 2 seconds,
 this “launch lag” is dominating the computation time and only gets worse as
 I add nodes.

 Any insight on how I could go about diagnosing this would be greatly
 appreciated.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: grouping by a partitioned key

2015-08-12 Thread Philip Weaver
Yes, I am partitoning using DataFrameWriter.partitionBy, which produces the
keyed directory structure that you referenced in that link.

On Tue, Aug 11, 2015 at 11:54 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 As far as I know, Spark SQL cannot process data on a per-partition-basis.
 DataFrame.foreachPartition is the way.

 I haven't tried it, but, following looks like a not-so-sophisticated way
 of making spark sql partition aware.


 http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


 On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver philip.wea...@gmail.com
 wrote:

 Thanks.

 In my particular case, I am calculating a distinct count on a key that is
 unique to each partition, so I want to calculate the distinct count within
 each partition, and then sum those. This approach will avoid moving the
 sets of that key around between nodes, which would be very expensive.

 Currently, to accomplish this we are manually reading in the parquet
 files (not through Spark SQL), using a bitset to calculate the unique count
 within each partition, and accumulating that sum. Doing this through Spark
 SQL would be nice, but the naive SELECT distinct(count(...)) approach
 takes 60 times as long :). The approach I mentioned above might be an
 acceptable hybrid solution.

 - Philip


 On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru
 wrote:

 Philip,

 If all data per key are inside just one partition, then Spark will
 figure that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when
 you might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru









Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-11 Thread Philip Weaver
Do you think it might be faster to put all the files in one directory but
still partitioned the same way? I don't actually need to filter on the
values of the partition keys, but I need to rely on there be no overlap in
the value of the keys between any two parquet files.

On Fri, Aug 7, 2015 at 8:23 AM, Philip Weaver philip.wea...@gmail.com
wrote:

 Thanks, I also confirmed that the partition discovery is slow by writing a
 non-Spark application that uses the parquet library directly to load that
 partitions.

 It's so slow that my colleague's Python application can read the entire
 contents of all the parquet data files faster than my application can even
 discover the partitions!

 On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian lian.cs@gmail.com wrote:

 However, it's weird that the partition discovery job only spawns 2 tasks.
 It should use the default parallelism, which is probably 8 according to the
 logs of the next Parquet reading job. Partition discovery is already done
 in a distributed manner via a Spark job. But the parallelism is
 mysteriously low...

 Cheng


 On 8/7/15 3:32 PM, Cheng Lian wrote:

 Hi Philip,

 Thanks for providing the log file. It seems that most of the time are
 spent on partition discovery. The code snippet you provided actually issues
 two jobs. The first one is for listing the input directories to find out
 all leaf directories (and this actually requires listing all leaf files,
 because we can only assert that a directory is a leaf one when it contains
 no sub-directories). Then partition information is extracted from leaf
 directory paths. This process starts at:

 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and
 directories in parallel under:
 file:/home/pweaver/work/parquet/day=20150225, …

 and ends at:

 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
 tasks have all completed, from pool

 The actual tasks execution time is about 36s:

 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0
 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
 …
 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0
 (TID 0) in 36107 ms on lindevspark5 (1/2)

 You mentioned that your dataset has about 40,000+ partitions, so there
 are a lot of leaf directories and files out there. My guess is that the
 local file system spent lots of time listing FileStatus-es of all these
 files.

 I also noticed that Mesos job scheduling takes more time then expected.
 It is probably because this is the first Spark job executed in the
 application, and the system is not warmed up yet. For example, there’s a 6s
 gap between these two adjacent lines:

 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2
 tasks
 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
 TASK_RUNNING

 The 2nd Spark job is the real Parquet reading job, and this one actually
 finishes pretty quickly, only 3s (note that the Mesos job scheduling
 latency is also included):

 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at
 App.scala:182) with 8 output partitions
 …
 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0
 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0
 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0
 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
 …
 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0
 (TID 8) in 1527 ms on lindevspark4 (6/8)
 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0
 (TID 6) in 1533 ms on lindevspark4 (7/8)
 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0
 (TID 9) in 2886 ms on lindevspark5 (8/8)

 That might be the reason why you observed that the C parquet library you
 mentioned (is it parquet-cpp?) is an order of magnitude faster?

 Cheng

 On 8/7/15 2:02 AM, Philip Weaver wrote:

 With DEBUG, the log output was over 10MB, so I opted for just INFO
 output. The (sanitized) log is attached.

 The driver is essentially this code:

 info(A)

 val t = System.currentTimeMillis
 val df = sqlContext.read.parquet(dir).select(...).cache

 val elapsed = System.currentTimeMillis - t
 info(sInit time: ${elapsed} ms)

 We've also observed that it is very slow to read the contents of the
 parquet files. My colleague wrote a PySpark application that gets the list
 of files, parallelizes it, maps across it and reads each file manually
 using a C parquet library, and aggregates manually in the loop. Ignoring
 the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
 query in Scala, his is an order of magnitude faster. Since he is
 parallelizing the work through Spark, and that isn't causing any
 performance issues, it seems to be a problem with the parquet reader. I may
 try to do what he did to construct a DataFrame manually

grouping by a partitioned key

2015-08-11 Thread Philip Weaver
If I have an RDD that happens to already be partitioned by a key, how
efficient can I expect a groupBy operation to be? I would expect that Spark
shouldn't have to move data around between nodes, and simply will have a
small amount of work just checking the partitions to discover that it
doesn't need to move anything around.

Now, what if we're talking about a parquet database created by using
DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
by a key that I'm already partitioned by?

- Philip


Re: grouping by a partitioned key

2015-08-11 Thread Philip Weaver
Thanks.

In my particular case, I am calculating a distinct count on a key that is
unique to each partition, so I want to calculate the distinct count within
each partition, and then sum those. This approach will avoid moving the
sets of that key around between nodes, which would be very expensive.

Currently, to accomplish this we are manually reading in the parquet files
(not through Spark SQL), using a bitset to calculate the unique count
within each partition, and accumulating that sum. Doing this through Spark
SQL would be nice, but the naive SELECT distinct(count(...)) approach
takes 60 times as long :). The approach I mentioned above might be an
acceptable hybrid solution.

- Philip


On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru wrote:

 Philip,

 If all data per key are inside just one partition, then Spark will figure
 that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when you
 might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru







Re: Spark failed while trying to read parquet files

2015-08-07 Thread Philip Weaver
Yes, NullPointerExceptions are pretty common in Spark (or, rather, I seem
to encounter them a lot!) but can occur for a few different reasons. Could
you add some more detail, like what the schema is for the data, or the code
you're using to read it?

On Fri, Aug 7, 2015 at 3:20 PM, Jerrick Hoang jerrickho...@gmail.com
wrote:

 Hi all,

 I have a partitioned parquet table (very small table with only 2
 partitions). The version of spark is 1.4.1, parquet version is 1.7.0. I
 applied this patch to spark [SPARK-7743] so I assume that spark can read
 parquet files normally, however, I'm getting this when trying to do a
 simple `select count(*) from table`,

 ```org.apache.spark.SparkException: Job aborted due to stage failure: Task
 29 in stage 44.0 failed 15 times, most recent failure: Lost task 29.14 in
 stage 44.0: java.lang.NullPointerException
 at
 parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:249)
 at
 parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:543)
 at
 parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:520)
 at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:426)
 at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:381)
 at
 parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:155)
 at
 parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138)
 at
 org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153)
 at
 org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124)
 at
 org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)```

 Has anybody seen this before?

 Thanks



Re: How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Philip Weaver
If the object cannot be serialized, then I don't think broadcast will make
it magically serializable. You can't transfer data structures between nodes
without serializing them somehow.

On Fri, Aug 7, 2015 at 7:31 AM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hi Hao,

 I think sc.broadcast will allow you to broadcast non-serializable objects.
 According to the scaladocs the Broadcast class itself is Serializable and
 it wraps your object, allowing you to get it from the Broadcast object
 using value().

 Not 100% sure though since I haven't tried broadcasting custom objects but
 maybe worth trying unless you have already and failed.

 -sujit


 On Fri, Aug 7, 2015 at 2:39 AM, Hao Ren inv...@gmail.com wrote:

 Is there any workaround to distribute non-serializable object for RDD
 transformation or broadcast variable ?

 Say I have an object of class C which is not serializable. Class C is in
 a jar package, I have no control on it. Now I need to distribute it either
 by rdd transformation or by broadcast.

 I tried to subclass the class C with Serializable interface. It works for
 serialization, but deserialization does not work, since there are no
 parameter-less constructor for the class C and deserialization is broken
 with an invalid constructor exception.

 I think it's a common use case. Any help is appreciated.

 --
 Hao Ren

 Data Engineer @ leboncoin

 Paris, France





Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Philip Weaver
Thanks, I also confirmed that the partition discovery is slow by writing a
non-Spark application that uses the parquet library directly to load that
partitions.

It's so slow that my colleague's Python application can read the entire
contents of all the parquet data files faster than my application can even
discover the partitions!

On Fri, Aug 7, 2015 at 2:09 AM, Cheng Lian lian.cs@gmail.com wrote:

 However, it's weird that the partition discovery job only spawns 2 tasks.
 It should use the default parallelism, which is probably 8 according to the
 logs of the next Parquet reading job. Partition discovery is already done
 in a distributed manner via a Spark job. But the parallelism is
 mysteriously low...

 Cheng


 On 8/7/15 3:32 PM, Cheng Lian wrote:

 Hi Philip,

 Thanks for providing the log file. It seems that most of the time are
 spent on partition discovery. The code snippet you provided actually issues
 two jobs. The first one is for listing the input directories to find out
 all leaf directories (and this actually requires listing all leaf files,
 because we can only assert that a directory is a leaf one when it contains
 no sub-directories). Then partition information is extracted from leaf
 directory paths. This process starts at:

 10:51:44 INFO sources.HadoopFsRelation: Listing leaf files and directories
 in parallel under: file:/home/pweaver/work/parquet/day=20150225, …

 and ends at:

 10:52:31 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose
 tasks have all completed, from pool

 The actual tasks execution time is about 36s:

 10:51:54 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0
 (TID 0, lindevspark5, PROCESS_LOCAL, 3087 bytes)
 …
 10:52:30 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0
 (TID 0) in 36107 ms on lindevspark5 (1/2)

 You mentioned that your dataset has about 40,000+ partitions, so there are
 a lot of leaf directories and files out there. My guess is that the local
 file system spent lots of time listing FileStatus-es of all these files.

 I also noticed that Mesos job scheduling takes more time then expected. It
 is probably because this is the first Spark job executed in the
 application, and the system is not warmed up yet. For example, there’s a 6s
 gap between these two adjacent lines:

 10:51:45 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
 10:51:51 INFO mesos.CoarseMesosSchedulerBackend: Mesos task 0 is now
 TASK_RUNNING

 The 2nd Spark job is the real Parquet reading job, and this one actually
 finishes pretty quickly, only 3s (note that the Mesos job scheduling
 latency is also included):

 10:52:32 INFO scheduler.DAGScheduler: Got job 1 (parquet at App.scala:182)
 with 8 output partitions
 …
 10:52:32 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0
 (TID 2, lindevspark4, PROCESS_LOCAL, 2058 bytes)
 10:52:32 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0
 (TID 3, lindevspark5, PROCESS_LOCAL, 2058 bytes)
 10:52:32 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0
 (TID 4, lindevspark4, PROCESS_LOCAL, 2058 bytes)
 …
 10:52:34 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 1.0
 (TID 8) in 1527 ms on lindevspark4 (6/8)
 10:52:34 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0
 (TID 6) in 1533 ms on lindevspark4 (7/8)
 10:52:35 INFO scheduler.TaskSetManager: Finished task 7.0 in stage 1.0
 (TID 9) in 2886 ms on lindevspark5 (8/8)

 That might be the reason why you observed that the C parquet library you
 mentioned (is it parquet-cpp?) is an order of magnitude faster?

 Cheng

 On 8/7/15 2:02 AM, Philip Weaver wrote:

 With DEBUG, the log output was over 10MB, so I opted for just INFO output.
 The (sanitized) log is attached.

 The driver is essentially this code:

 info(A)

 val t = System.currentTimeMillis
 val df = sqlContext.read.parquet(dir).select(...).cache

 val elapsed = System.currentTimeMillis - t
 info(sInit time: ${elapsed} ms)

 We've also observed that it is very slow to read the contents of the
 parquet files. My colleague wrote a PySpark application that gets the list
 of files, parallelizes it, maps across it and reads each file manually
 using a C parquet library, and aggregates manually in the loop. Ignoring
 the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
 query in Scala, his is an order of magnitude faster. Since he is
 parallelizing the work through Spark, and that isn't causing any
 performance issues, it seems to be a problem with the parquet reader. I may
 try to do what he did to construct a DataFrame manually, and see if I can
 query it with Spark SQL with reasonable performance.

 - Philip


 On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian  lian.cs@gmail.com
 lian.cs@gmail.com wrote:

 Would you mind to provide the driver log?


 On 8/6/15 3:58 PM, Philip Weaver wrote:

 I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
 again

Re: Unable to persist RDD to HDFS

2015-08-06 Thread Philip Weaver
This isn't really a Spark question. You're trying to parse a string to an
integer, but it contains an invalid character. The exception message
explains this.

On Wed, Aug 5, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Code:
 import java.text.SimpleDateFormat
 import java.util.Calendar
 import java.sql.Date
 import org.apache.spark.storage.StorageLevel

 def formatStringAsDate(dateStr: String) = new java.sql.Date(new
 SimpleDateFormat(-MM-dd).parse(dateStr).getTime())


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)

 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
 f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
 f12: Integer, f13: Integer, f14: String)


 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz)

 val summary  = rowStructText.filter(s = s.length != 1).map(s =
 s.split(\t)).map(
 {
 s =
 Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 }
 )

 summary.saveAsTextFile(sparkO)


 Output:
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.sql.Date import org.apache.spark.storage.StorageLevel
 formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz
 MapPartitionsRDD[639] at textFile at console:305 summary:
 org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at
 console:310 org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost
 task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com):
 java.lang.NumberFormatException: For input string: 3g at
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 at java.lang.Integer.parseInt(Integer.java:580) at
 java.lang.Integer.parseInt(Integer.java:615) at
 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at
 scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:318)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:312)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
 org.apache.spark.scheduler.Task.run(Task.scala:64) at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)



 OR
 summary.count throws same exception

 Any suggestions ?

 --
 Deepak




Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
With DEBUG, the log output was over 10MB, so I opted for just INFO output.
The (sanitized) log is attached.

The driver is essentially this code:

info(A)

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(sInit time: ${elapsed} ms)

We've also observed that it is very slow to read the contents of the
parquet files. My colleague wrote a PySpark application that gets the list
of files, parallelizes it, maps across it and reads each file manually
using a C parquet library, and aggregates manually in the loop. Ignoring
the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
query in Scala, his is an order of magnitude faster. Since he is
parallelizing the work through Spark, and that isn't causing any
performance issues, it seems to be a problem with the parquet reader. I may
try to do what he did to construct a DataFrame manually, and see if I can
query it with Spark SQL with reasonable performance.

- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com wrote:

 Would you mind to provide the driver log?


 On 8/6/15 3:58 PM, Philip Weaver wrote:

 I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
 again.

 The initialization time is about 1 minute now, which is still pretty
 terrible.

 On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 Absolutely, thanks!

 On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian  lian.cs@gmail.com
 lian.cs@gmail.com wrote:

 We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

 Could you give it a shot to see whether it helps in your case? We've
 observed ~50x performance boost with schema merging turned on.

 Cheng


 On 8/6/15 8:26 AM, Philip Weaver wrote:

 I have a parquet directory that was produced by partitioning by two
 keys, e.g. like this:

 df.write.partitionBy(a, b).parquet(asdf)


 There are 35 values of a, and about 1100-1200 values of b for each
 value of a, for a total of over 40,000 partitions.

 Before running any transformations or actions on the DataFrame, just
 initializing it like this takes *2 minutes*:

 val df = sqlContext.read.parquet(asdf)


 Is this normal? Is this because it is doing some bookeeping to discover
 all the partitions? Is it perhaps having to merge the schema from each
 partition? Would you expect it to get better or worse if I subpartition by
 another key?

 - Philip







10:51:42  INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT
10:51:42  WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:51:42  INFO spark.SecurityManager: Changing view acls to: pweaver
10:51:42  INFO spark.SecurityManager: Changing modify acls to: pweaver
10:51:42  INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver)
10:51:43  INFO slf4j.Slf4jLogger: Slf4jLogger started
10:51:43  INFO Remoting: Starting remoting
10:51:43  INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400]
10:51:43  INFO util.Utils: Successfully started service 'sparkDriver' on port 51400.
10:51:43  INFO spark.SparkEnv: Registering MapOutputTracker
10:51:43  INFO spark.SparkEnv: Registering BlockManagerMaster
10:51:43  INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a
10:51:43  INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
10:51:43  INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc
10:51:43  INFO spark.HttpServer: Starting HTTP Server
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227
10:51:43  INFO util.Utils: Successfully started service 'HTTP file server' on port 55227.
10:51:43  INFO spark.SparkEnv: Registering OutputCommitCoordinator
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
10:51:43  INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
10:51:43  INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940
10:51:44  WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
10:51:44  INFO mesos.CoarseMesosSchedulerBackend

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
again.

The initialization time is about 1 minute now, which is still pretty
terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 Absolutely, thanks!

 On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

 We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

 Could you give it a shot to see whether it helps in your case? We've
 observed ~50x performance boost with schema merging turned on.

 Cheng


 On 8/6/15 8:26 AM, Philip Weaver wrote:

 I have a parquet directory that was produced by partitioning by two keys,
 e.g. like this:

 df.write.partitionBy(a, b).parquet(asdf)


 There are 35 values of a, and about 1100-1200 values of b for each
 value of a, for a total of over 40,000 partitions.

 Before running any transformations or actions on the DataFrame, just
 initializing it like this takes *2 minutes*:

 val df = sqlContext.read.parquet(asdf)


 Is this normal? Is this because it is doing some bookeeping to discover
 all the partitions? Is it perhaps having to merge the schema from each
 partition? Would you expect it to get better or worse if I subpartition by
 another key?

 - Philip







Re: spark hangs at broadcasting during a filter

2015-08-05 Thread Philip Weaver
How big is droprows?

Try explicitly broadcasting it like this:

val broadcastDropRows = sc.broadcast(dropRows)

val valsrows = ...
.filter(x = !broadcastDropRows.value.contains(x._1))

- Philip


On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote:

 I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
 matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some
 of the rows of A are relevant, so the following code first loads the
 triplets are text, splits them into Tuple3[Int, Int, Double], drops
 triplets
 whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
 Double]]] for each row (if I'm judging datatypes correctly).

 val valsrows = sc.textFile(valsinpath).map(_.split(,)).
   map(x = (x(1).toInt, (x(0).toInt,
 x(2).toDouble))).
   filter(x = !droprows.contains(x._1)).
   groupByKey.
   map(x = (x._1, x._2.toSeq.sortBy(_._1)))

 Spark hangs during a broadcast that occurs during the filter step
 (according
 to the Spark UI). The last two lines in the log before it pauses are:

 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in
 memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
 in
 memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

 I've left Spark running for up to 17 minutes one time, and it never
 continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
 instances (244Gb, 32 cores) with spark in standalone mode with 220G
 executor
 and driver memory, and using the kyroserializer.

 Any ideas on what could be causing this hang?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
This message means that java.util.Date is not supported by Spark DataFrame.
You'll need to use java.sql.Date, I believe.

On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 That seem to be working. however i see a new exception

 Code:
 def formatStringAsDate(dateStr: String) = new
 SimpleDateFormat(-MM-dd).parse(dateStr)


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
 f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
 f12: Integer, f13: Integer, f14: String)

 val summary  = rowStructText.map(s = s.split(,)).map(
 s = Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 ).toDF()
 bank.registerTempTable(summary)


 //Output
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
 MapPartitionsRDD[105] at textFile at console:60 defined class Summary x:
 org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
 console:61 java.lang.UnsupportedOperationException: Schema for type
 java.util.Date is not supported at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)


 Any suggestions

 On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 The parallelize method does not read the contents of a file. It simply
 takes a collection and distributes it to the cluster. In this case, the
 String is a collection 67 characters.

 Use sc.textFile instead of sc.parallelize, and it should work as you want.

 On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak





 --
 Deepak




Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

 We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

 Could you give it a shot to see whether it helps in your case? We've
 observed ~50x performance boost with schema merging turned on.

 Cheng


 On 8/6/15 8:26 AM, Philip Weaver wrote:

 I have a parquet directory that was produced by partitioning by two keys,
 e.g. like this:

 df.write.partitionBy(a, b).parquet(asdf)


 There are 35 values of a, and about 1100-1200 values of b for each
 value of a, for a total of over 40,000 partitions.

 Before running any transformations or actions on the DataFrame, just
 initializing it like this takes *2 minutes*:

 val df = sqlContext.read.parquet(asdf)


 Is this normal? Is this because it is doing some bookeeping to discover
 all the partitions? Is it perhaps having to merge the schema from each
 partition? Would you expect it to get better or worse if I subpartition by
 another key?

 - Philip






Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
The parallelize method does not read the contents of a file. It simply
takes a collection and distributes it to the cluster. In this case, the
String is a collection 67 characters.

Use sc.textFile instead of sc.parallelize, and it should work as you want.

On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak




Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
I have a parquet directory that was produced by partitioning by two keys,
e.g. like this:

df.write.partitionBy(a, b).parquet(asdf)


There are 35 values of a, and about 1100-1200 values of b for each
value of a, for a total of over 40,000 partitions.

Before running any transformations or actions on the DataFrame, just
initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet(asdf)


Is this normal? Is this because it is doing some bookeeping to discover all
the partitions? Is it perhaps having to merge the schema from each
partition? Would you expect it to get better or worse if I subpartition by
another key?

- Philip


Re: Turn Off Compression for Textfiles

2015-08-04 Thread Philip Weaver
The .gz extension indicates that the file is compressed with gzip. Choose a
different extension (e.g. .txt) when you save them.

On Tue, Aug 4, 2015 at 7:00 PM, Brandon White bwwintheho...@gmail.com
wrote:

 How do you turn off gz compression for saving as textfiles? Right now, I
 am reading ,gz files and it is saving them as .gz. I would love to not
 compress them when I save.

 1) DStream.saveAsTextFiles() //no compression

 2) RDD.saveAsTextFile() //no compression

 Any ideas?



Safe to write to parquet at the same time?

2015-08-03 Thread Philip Weaver
I think this question applies regardless if I have two completely separate
Spark jobs or tasks on different machines, or two cores that are part of
the same task on the same machine.

If two jobs/tasks/cores/stages both save to the same parquet directory in
parallel like this:

df1.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)

df2.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)


Will the result be equivalent to this?

df1.unionAll(df2).write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir)


What if we ensure that 'dir' does not exist first?

- Philip


Unable to compete with performance of single-threaded Scala application

2015-08-03 Thread Philip Weaver
Hello,

I am running Spark 1.4.0 on Mesos 0.22.1, and usually I run my jobs in
coarse-grained mode.

I have written some single-threaded standalone Scala applications for a
problem
that I am working on, and I am unable to get a Spark solution that comes
close
to the performance of this application. My hope was to sacrifice some
performance to get an easily scalable solution, but I'm finding that the
single-threaded implementations consistently outperform Spark even with a
couple
dozen cores, and I'm having trouble getting Spark to scale linearly.

All files are binary files with fixed-width records, ranging from about 40
bytes
to 200 bytes per record depending on the type. The files are already
partitioned
by 3 keys, with one file for each combination. Basically the layout is
/customer/day/partition_number. The ultimate goal is to read time series
events,
join in some smaller tables when processing those events, and write the
result
to parquet. For this discussion, I'm focusing on just a simple problem:
reading
and aggregating the events.

I started with a simple experiment to walk over all the events and sum the
value
of an integer field. I implemented two standalone solutions and a Spark
solution:

1) For each file, use a BufferedInputStream to iterate over each fixed-width
   row, copy the row to a Array[Byte], and then parse the one field out of
that
   array. This can process events at about 30 million/second.

2) Memory-map each file to a java.nio.MappedByteBuffer. Calculate the sum by
   directly selecting the integer field while iterating over the rows. This
   solution can process about 100-300 million events/second.

3) Use SparkContext.binaryRecords, map over the RDD[Array[Byte]] to parse or
   select the field, and then called sum on that.

Although performance is understandably much better when I use a memory
mapped
bytebuffer, I would expect my Spark solution to get the same per-core
throughput
as solution #1 above, where the record type is Array[Byte] and I'm using the
same approach to pull out the integer field from that byte array.

However, the Spark solution achieves only 1-2 million events/second on 1
core, 4
million events/second on 2 nodes with 4 cores each, and 8 million
events/second
on 6 nodes with 4 cores each. So, not only was the performance a fraction
of my
standalone application, but it can't even scale linearly to 6 nodes.

- Philip