Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread Daniel Siegmann
Those settings apply when a shuffle happens. But they don't affect the way
the data will be partitioned when it is initially read, for example
spark.read.parquet("path/to/input"). So for HDFS / S3 I think it depends on
how the data is split into chunks, but if there are lots of small chunks
Spark will automatically merge them into small partitions. There are going
to be various settings depending on what you're reading from.

val df = spark.read.parquet("path/to/input") // partitioning will depend on
the data
val df2 = df.groupBy("thing").count() // a shuffle happened, so shuffle
partitioning configuration applies


Tip: gzip files can't be split, so if you read a gzip file everything will
be in one partition. That's a good reason to avoid large gzip files. :-)

If you don't have a shuffle but you want to change how many partitions
there are, you will need to coalesce or repartition.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Oct 26, 2017 at 11:31 AM, lucas.g...@gmail.com <lucas.g...@gmail.com
> wrote:

> Thanks Daniel!
>
> I've been wondering that for ages!
>
> IE where my JDBC sourced datasets are coming up with 200 partitions on
> write to S3.
>
> What do you mean for (except for the initial read)?
>
> Can you explain that a bit further?
>
> Gary Lucas
>
> On 26 October 2017 at 11:28, Daniel Siegmann <dsiegmann@securityscorecard.
> io> wrote:
>
>> When working with datasets, Spark uses spark.sql.shuffle.partitions. It
>> defaults to 200. Between that and the default parallelism you can control
>> the number of partitions (except for the initial read).
>>
>> More info here: http://spark.apache.org/docs/l
>> atest/sql-programming-guide.html#other-configuration-options
>>
>> I have no idea why it defaults to a fixed 200 (while default parallelism
>> defaults to a number scaled to your number of cores), or why there are two
>> separate configuration properties.
>>
>>
>> --
>> Daniel Siegmann
>> Senior Software Engineer
>> *SecurityScorecard Inc.*
>> 214 W 29th Street, 5th Floor
>> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001=gmail=g>
>> New York, NY 10001
>> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001=gmail=g>
>>
>>
>> On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <deepakmc...@gmail.com>
>> wrote:
>>
>>> I guess the issue is spark.default.parallelism is ignored when you are
>>> working with Data frames.It is supposed to work with only raw RDDs.
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <
>>> noo...@noorul.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have the following spark configuration
>>>>
>>>> spark.app.name=Test
>>>> spark.cassandra.connection.host=127.0.0.1
>>>> spark.cassandra.connection.keep_alive_ms=5000
>>>> spark.cassandra.connection.port=1
>>>> spark.cassandra.connection.timeout_ms=3
>>>> spark.cleaner.ttl=3600
>>>> spark.default.parallelism=4
>>>> spark.master=local[2]
>>>> spark.ui.enabled=false
>>>> spark.ui.showConsoleProgress=false
>>>>
>>>> Because I am setting spark.default.parallelism to 4, I was expecting
>>>> only 4 spark partitions. But it looks like it is not the case
>>>>
>>>> When I do the following
>>>>
>>>> df.foreachPartition { partition =>
>>>>   val groupedPartition = partition.toList.grouped(3).toList
>>>>   println("Grouped partition " + groupedPartition)
>>>> }
>>>>
>>>> There are too many print statements with empty list at the top. Only
>>>> the relevant partitions are at the bottom. Is there a way to control
>>>> number of partitions?
>>>>
>>>> Regards,
>>>> Noorul
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
>>
>


Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread Daniel Siegmann
When working with datasets, Spark uses spark.sql.shuffle.partitions. It
defaults to 200. Between that and the default parallelism you can control
the number of partitions (except for the initial read).

More info here:
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

I have no idea why it defaults to a fixed 200 (while default parallelism
defaults to a number scaled to your number of cores), or why there are two
separate configuration properties.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <deepakmc...@gmail.com>
wrote:

> I guess the issue is spark.default.parallelism is ignored when you are
> working with Data frames.It is supposed to work with only raw RDDs.
>
> Thanks
> Deepak
>
> On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <
> noo...@noorul.com> wrote:
>
>> Hi all,
>>
>> I have the following spark configuration
>>
>> spark.app.name=Test
>> spark.cassandra.connection.host=127.0.0.1
>> spark.cassandra.connection.keep_alive_ms=5000
>> spark.cassandra.connection.port=1
>> spark.cassandra.connection.timeout_ms=3
>> spark.cleaner.ttl=3600
>> spark.default.parallelism=4
>> spark.master=local[2]
>> spark.ui.enabled=false
>> spark.ui.showConsoleProgress=false
>>
>> Because I am setting spark.default.parallelism to 4, I was expecting
>> only 4 spark partitions. But it looks like it is not the case
>>
>> When I do the following
>>
>> df.foreachPartition { partition =>
>>   val groupedPartition = partition.toList.grouped(3).toList
>>   println("Grouped partition " + groupedPartition)
>> }
>>
>> There are too many print statements with empty list at the top. Only
>> the relevant partitions are at the bottom. Is there a way to control
>> number of partitions?
>>
>> Regards,
>> Noorul
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
On Thu, Sep 28, 2017 at 7:23 AM, Gourav Sengupta 
wrote:

>
> I will be very surprised if someone tells me that a 1 GB CSV text file is
> automatically split and read by multiple executors in SPARK. It does not
> matter whether it stays in HDFS, S3 or any other system.
>

I can't speak to *any* system, but I can confirm for HDFS, S3, and local
filesystems a 1 GB CSV file would be split.


>
> Now if someone tells me that in case I have a smaller CSV file of 100MB
> size and that will be split while being read, that will also be surprising.
>

I'm not sure what the default is. It may be 128 MB, in which case that file
would not be split.

Keep in mind gzipped files cannot be split. If you have very large text
files and you want to compress them, and they will be > a few hundred MB
compressed, you should probably use bzip2 instead (which can be split).


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB)
> text file? Does it use InputFormat do create multiple splits and creates 1
> partition per split? Also, in case of S3 or NFS, how does the input split
> work? I understand for HDFS files are already pre-split so Spark can use
> dfs.blocksize to determine partitions. But how does it work other than HDFS?
>

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides
to split for the local filesystem. But it does. Maybe someone else will be
able to explain the details.


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
> no matter what you do and how many nodes you start, in case you have a
> single text file, it will not use parallelism.
>

This is not true, unless the file is small or is gzipped (gzipped files
cannot be split).


Re: Documentation on "Automatic file coalescing for native data sources"?

2017-05-26 Thread Daniel Siegmann
Thanks for the help everyone.

It seems the automatic coalescing doesn't happen when accessing ORC data
through a Hive metastore unless you configure
spark.sql.hive.convertMetastoreOrc to be true (it is false by default). I'm
not sure if this is documented somewhere, or if there's any reason not to
enable it, but I haven't had any problem with it.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Sat, May 20, 2017 at 9:14 PM, Kabeer Ahmed <kab...@gmx.co.uk> wrote:

> Thank you Takeshi.
>
> As far as I see from the code pointed, the default number of bytes to pack
> in a partition is set to 128MB - size of the parquet block size.
>
> Daniel,
>
> It seems you do have a need to modify the number of bytes you want to pack
> per partition. I am curious to know the scenario. Please share if you can.
>
> Thanks,
> Kabeer.
>
> On May 20 2017, at 4:54 pm, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> I think this document points to a logic here: https://github.com/
>> apache/spark/blob/master/sql/core/src/main/scala/org/
>> apache/spark/sql/execution/DataSourceScanExec.scala#L418
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala?recipient=dsiegmann%40securityscorecard.io#L418>
>>
>> This logic merge small files into a partition and you can control this
>> threshold via `spark.sql.files.maxPartitionBytes`.
>>
>> // maropu
>>
>>
>> On Sat, May 20, 2017 at 8:15 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>> I think like all other read operations, it is driven by input format
>> used, and I think some variation of combine file input format is used by
>> default. I think you can test it by force a particular input format which
>> gets ine file per split, then you should end up with same number of
>> partitions as your dsta files
>>
>> On Sat, 20 May 2017 at 5:12 am, Aakash Basu <aakash.spark@gmail.com>
>> wrote:
>>
>> Hey all,
>>
>> A reply on this would be great!
>>
>> Thanks,
>> A.B.
>>
>> On 17-May-2017 1:43 AM, "Daniel Siegmann" <dsiegm...@securityscorecard.io>
>> wrote:
>>
>> When using spark.read on a large number of small files, these are
>> automatically coalesced into fewer partitions. The only documentation I can
>> find on this is in the Spark 2.0.0 release notes, where it simply says (
>> http://spark.apache.org/releases/spark-release-2-0-0.html
>> <http://spark.apache.org/releases/spark-release-2-0-0.html?recipient=dsiegmann%40securityscorecard.io>
>> ):
>>
>> "Automatic file coalescing for native data sources"
>>
>> Can anyone point me to documentation explaining what triggers this
>> feature, how it decides how many partitions to coalesce to, and what counts
>> as a "native data source"? I couldn't find any mention of this feature in
>> the SQL Programming Guide and Google was not helpful.
>>
>> --
>> Daniel Siegmann
>> Senior Software Engineer
>> *SecurityScorecard Inc.*
>> 214 W 29th Street, 5th Floor
>> New York, NY 10001
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Documentation on "Automatic file coalescing for native data sources"?

2017-05-16 Thread Daniel Siegmann
When using spark.read on a large number of small files, these are
automatically coalesced into fewer partitions. The only documentation I can
find on this is in the Spark 2.0.0 release notes, where it simply says (
http://spark.apache.org/releases/spark-release-2-0-0.html):

"Automatic file coalescing for native data sources"

Can anyone point me to documentation explaining what triggers this feature,
how it decides how many partitions to coalesce to, and what counts as a
"native data source"? I couldn't find any mention of this feature in the
SQL Programming Guide and Google was not helpful.

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: [Spark] Accumulators or count()

2017-03-01 Thread Daniel Siegmann
As you noted, Accumulators do not guarantee accurate results except in
specific situations. I recommend never using them.

This article goes into some detail on the problems with accumulators:
http://imranrashid.com/posts/Spark-Accumulators/


On Wed, Mar 1, 2017 at 7:26 AM, Charles O. Bajomo <
charles.baj...@pretechconsulting.co.uk> wrote:

> Hello everyone,
>
> I wanted to know if there is any benefit to using an acculumator over just
> executing a count() on the whole RDD. There seems to be a lot of issues
> with accumulator during a stage failure and also seems to be an issue
> rebuilding them if the application restarts from a checkpoint. Anyone have
> any suggestions no this?
>
> Thanks
>


Re: Spark #cores

2017-01-18 Thread Daniel Siegmann
I am not too familiar with Spark Standalone, so unfortunately I cannot give
you any definite answer. I do want to clarify something though.

The properties spark.sql.shuffle.partitions and spark.default.parallelism
affect how your data is split up, which will determine the *total* number
of tasks, *NOT* the number of tasks being run in parallel. Except of course
you will never run more tasks in parallel than there are total, so if your
data is small you might be able to control it via these parameters - but
that wouldn't typically be how you'd use these parameters.

On YARN as you noted there is spark.executor.instances as well as
spark.executor.cores, and you'd multiple them to determine the maximum
number of tasks that would run in parallel on your cluster. But there is no
guarantee the executors would be distributed evenly across nodes.

Unfortunately I'm not familiar with how this works on Spark Standalone.
Your expectations seem reasonable to me. Sorry I can't be helpful,
hopefully someone else will be able to explain exactly how this works.


Re: Why does Spark 2.0 change number or partitions when reading a parquet file?

2016-12-22 Thread Daniel Siegmann
Spark 2.0.0 introduced "Automatic file coalescing for native data sources" (
http://spark.apache.org/releases/spark-release-2-0-0.html#performance-and-runtime).
Perhaps that is the cause?

I'm not sure if this feature is mentioned anywhere in the documentation or
if there's any way to disable it.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Dec 22, 2016 at 11:09 AM, Kristina Rogale Plazonic <kpl...@gmail.com
> wrote:

> Hi,
>
> I write a randomly generated 30,000-row dataframe to parquet. I verify
> that it has 200 partitions (both in Spark and inspecting the parquet file
> in hdfs).
>
> When I read it back in, it has 23 partitions?! Is there some optimization
> going on? (This doesn't happen in Spark 1.5)
>
> *How can I force it to read back the same partitions i.e. 200?* I'm
> trying to reproduce a problem that depends on partitioning and can't
> because the number of partitions goes way down.
>
> Thanks for any insights!
> Kristina
>
> Here is the code and output:
>
> scala> spark.version
> res13: String = 2.0.2
>
> scala> df.show(2)
> +---+---+--+--+--+--
> +++
> | id|id2|  strfeat0|  strfeat1|  strfeat2|
>  strfeat3|binfeat0|binfeat1|
> +---+---+--+--+--+--
> +++
> |  0|12345678901 <(234)%20567-8901>|fcvEmHTZte|
>  null|fnuAQdnBkJ|aU3puFMq5h|   1|   1|
> |  1|12345678902 <(234)%20567-8902>|  
> null|rtcrPaAVNX|fnuAQdnBkJ|x6NyoX662X|
>   0|   0|
> +---+---+--+--+--+--
> +++
> only showing top 2 rows
>
>
> scala> df.count
> res15: Long = 30001
>
> scala> df.rdd.partitions.size
> res16: Int = 200
>
> scala> df.write.parquet("/tmp/df")
>
>
> scala> val newdf = spark.read.parquet("/tmp/df")
> newdf: org.apache.spark.sql.DataFrame = [id: int, id2: bigint ... 6 more
> fields]
>
> scala> newdf.rdd.partitions.size
> res18: Int = 23
>
>
> [kris@airisdata195 ~]$ hdfs dfs -ls /tmp/df
> Found 201 items
> -rw-r--r--   3 kris supergroup  0 2016-12-22 11:01 /tmp/df/_SUCCESS
> -rw-r--r--   3 kris supergroup   4974 2016-12-22 11:01
> /tmp/df/part-r-0-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
> -rw-r--r--   3 kris supergroup   4914 2016-12-22 11:01
> /tmp/df/part-r-1-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
> .
> . (omitted output)
> .
> -rw-r--r--   3 kris supergroup   4893 2016-12-22 11:01
> /tmp/df/part-r-00198-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
> -rw-r--r--   3 kris supergroup   4981 2016-12-22 11:01
> /tmp/df/part-r-00199-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
>
>


Re: Few questions on reliability of accumulators value.

2016-12-12 Thread Daniel Siegmann
Accumulators are generally unreliable and should not be used. The answer to
(2) and (4) is yes. The answer to (3) is both.

Here's a more in-depth explanation:
http://imranrashid.com/posts/Spark-Accumulators/

On Sun, Dec 11, 2016 at 11:27 AM, Sudev A C  wrote:

> Please help.
> Anyone, any thoughts on the previous mail ?
>
> Thanks
> Sudev
>
>
> On Fri, Dec 9, 2016 at 2:28 PM Sudev A C  wrote:
>
>> Hi,
>>
>> Can anyone please help clarity on how accumulators can be used reliably
>> to measure error/success/analytical metrics ?
>>
>> Given below is use case / code snippet that I have.
>>
>> val amtZero = sc.accumulator(0)
>> val amtLarge = sc.accumulator(0)
>> val amtNormal = sc.accumulator(0)
>> val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos))
>> {
>>   amtZero.add(1)
>>   0.0
>> } else {
>>   val amount = x.getDouble(4)
>>   if (amount > 1) amtLarge.add(1) else amtNormal.add(1)
>>   amount
>> }
>> mrdd = rdd.map(s => (s, getAmount(s)))
>> mrdd.cache()
>> another_mrdd = rdd2.map(s => (s, getAmount(s)))
>> mrdd.save_to_redshift
>> another_mrdd.save_to_redshift
>> mrdd.union(another_mrdd).map().groupByKey().save_to_redshift
>>
>>
>>
>> // Get values from accumulators and persist it to a reliable store for
>> analytics.
>> save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value)
>>
>>
>>
>> Few questions :
>>
>> 1. How many times should I expect the counts for items within mrdd and
>> another_mrdd since both of these rdd's are being reused ? What happens when
>> a part of DAG is skipped due to caching in between (say I'm caching
>> only mrdd)?
>>
>> 2. Should I be worried about any possible stage/task failures (due to
>> master-wroker network issues/resource-starvation/speculative-execution),
>> can these events lead to wrong counts ?
>>
>> 3. Document says  **In transformations, users should be aware of that
>> each task’s update may be applied more than once if tasks or job stages are
>> re-executed.**
>> Here re-execution of stages/tasks is referring to failure re-executions
>> or re-execution due to stage/tasks position in DAG ?
>>
>> 4. Is it safe to say that usage of accumulators(for exact counts) are
>> limited to .foreach() as actions guarantee exactly once updates ?
>>
>> Thanks
>> Sudev
>>
>>
>>
>>


Re: CSV to parquet preserving partitioning

2016-11-15 Thread Daniel Siegmann
Did you try unioning the datasets for each CSV into a single dataset? You
may need to put the directory name into a column so you can partition by it.

On Tue, Nov 15, 2016 at 8:44 AM, benoitdr 
wrote:

> Hello,
>
> I'm trying to convert a bunch of csv files to parquet, with the interesting
> case that the input csv files are already "partitioned" by directory.
> All the input files have the same set of columns.
> The input files structure looks like :
>
> /path/dir1/file1.csv
> /path/dir1/file2.csv
> /path/dir2/file3.csv
> /path/dir3/file4.csv
> /path/dir3/file5.csv
> /path/dir3/file6.csv
>
> I'd like to read those files and write their data to a parquet table in
> hdfs, preserving the partitioning (partitioned by input directory), and
> such
> as there is a single output file per partition.
> The output files strucutre should look like :
>
> hdfs://path/dir=dir1/part-r-xxx.gz.parquet
> hdfs://path/dir=dir2/part-r-yyy.gz.parquet
> hdfs://path/dir=dir3/part-r-zzz.gz.parquet
>
>
> The best solution I have found so far is to loop among the input
> directories, loading the csv files in a dataframe and to write the
> dataframe
> in the target partition.
> But this not efficient since I want a single output file per partition, the
> writing to hdfs is a single tasks that blocks the loop.
> I wonder how to achieve this with a maximum of parallelism (and without
> shuffling the data in the cluster).
>
> Thanks !
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Quirk in how Spark DF handles JSON input records?

2016-11-02 Thread Daniel Siegmann
Yes, it needs to be on a single line. Spark (or Hadoop really) treats
newlines as a record separator by default. While it is possible to use a
different string as a record separator, what would you use in the case of
JSON?

If you do some Googling I suspect you'll find some possible solutions.
Personally, I would just use a separate JSON library (e.g. json4s) to parse
this metadata into an object, rather than trying to read it in through
Spark.

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: UseCase_Design_Help

2016-10-05 Thread Daniel Siegmann
I think it's fine to read animal types locally because there are only 70 of
them. It's just that you want to execute the Spark actions in parallel. The
easiest way to do that is to have only a single action.

Instead of grabbing the result right away, I would just add a column for
the animal type and union the datasets for the animal types. Something like
this (not sure if the syntax is correct):

val animalCounts: DataFrame = animalTypes.map { anmtyp =>
sqlContext.sql("select lit("+anmtyp+") as animal_type,
count(distinct("+anmtyp+")) from TEST1 ")
}.reduce(_.union(_))

animalCounts.foreach( /* print the output */ )

On Wed, Oct 5, 2016 at 12:42 AM, Daniel  wrote:

> First of all, if you want to read a txt file in Spark, you should use
> sc.textFile, because you are using "Source.fromFile", so you are reading it
> with Scala standard api, so it will be read sequentially.
>
> Furthermore you are going to need create a schema if you want to use
> dataframes.
>
> El 5/10/2016 1:53, "Ajay Chander"  escribió:
>
>> Right now, I am doing it like below,
>>
>> import scala.io.Source
>>
>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>
>> for ( anmtyp <- animalTypes ) {
>>   val distinctAnmTypCount = sqlContext.sql("select
>> count(distinct("+anmtyp+")) from TEST1 ")
>>   println("Calculating Metrics for Animal Type: "+anmtyp)
>>   if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>> println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>   } else {
>> println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>   }
>> }
>>
>> But the problem is it is running sequentially.
>>
>> Any inputs are appreciated. Thank you.
>>
>>
>> Regards,
>> Ajay
>>
>>
>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander  wrote:
>>
>>> Hi Everyone,
>>>
>>> I have a use-case where I have two Dataframes like below,
>>>
>>> 1) First Dataframe(DF1) contains,
>>>
>>> *ANIMALS*
>>> Mammals
>>> Birds
>>> Fish
>>> Reptiles
>>> Amphibians
>>>
>>> 2) Second Dataframe(DF2) contains,
>>>
>>> *ID, Mammals, Birds, Fish, Reptiles, Amphibians*
>>> 1,  Dogs,  Eagle,  Goldfish,  NULL,  Frog
>>> 2,  Cats,  Peacock,  Guppy, Turtle,  Salamander
>>> 3,  Dolphins,  Eagle,  Zander,  NULL,  Frog
>>> 4,  Whales,  Parrot,  Guppy,  Snake,  Frog
>>> 5,  Horses,  Owl,  Guppy,  Snake,  Frog
>>> 6,  Dolphins,  Kingfisher,  Zander,  Turtle,  Frog
>>> 7,  Dogs,  Sparrow,  Goldfish,  NULL,  Salamander
>>>
>>> Now I want to take each row from DF1 and find out its distinct count in
>>> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
>>> from DF2 i.e. 5
>>>
>>> DF1 has 70 distinct rows/Animal types
>>> DF2 has some million rows
>>>
>>> Whats the best way to achieve this efficiently using parallelism ?
>>>
>>> Any inputs are helpful. Thank you.
>>>
>>> Regards,
>>> Ajay
>>>
>>>
>>


Access S3 buckets in multiple accounts

2016-09-27 Thread Daniel Siegmann
I am running Spark on Amazon EMR and writing data to an S3 bucket. However,
the data is read from an S3 bucket in a separate AWS account. Setting the
fs.s3a.access.key and fs.s3a.secret.key values is sufficient to get access
to the other account (using the s3a protocol), however I then won't have
access to the S3 bucket in the EMR cluster's AWS account.

Is there any way for Spark to access S3 buckets in multiple accounts? If
not, is there any best practice for how to work around this?

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Dataset encoder for java.time.LocalDate?

2016-09-02 Thread Daniel Siegmann
It seems Spark can handle case classes with java.sql.Date, but not
java.time.LocalDate. It complains there's no encoder.

Are there any plans to add an encoder for LocalDate (and other classes in
the new Java 8 Time and Date API), or is there an existing library I can
use that provides encoders?

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: What are using Spark for

2016-08-02 Thread Daniel Siegmann
Yes, you can use Spark for ETL, as well as feature engineering, training,
and scoring.

~Daniel Siegmann

On Tue, Aug 2, 2016 at 3:29 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> If I may say, if you spend  sometime going through this mailing list in
> this forum and see the variety of topics that users are discussing, then
> you may get plenty of ideas about Spark application in real life..
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 2 August 2016 at 19:17, Karthik Ramakrishnan <
> karthik.ramakrishna...@gmail.com> wrote:
>
>> We used Storm for ETL, now currently thinking Spark might be advantageous
>> since some ML also is coming our way.
>>
>> - Karthik
>>
>> On Tue, Aug 2, 2016 at 1:10 PM, Rohit L <rohitfor...@gmail.com> wrote:
>>
>>> Does anyone use Spark for ETL?
>>>
>>> On Tue, Aug 2, 2016 at 1:24 PM, Sonal Goyal <sonalgoy...@gmail.com>
>>> wrote:
>>>
>>>> Hi Rohit,
>>>>
>>>> You can check the powered by spark page for some real usage of Spark.
>>>>
>>>> https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark
>>>>
>>>>
>>>> On Tuesday, August 2, 2016, Rohit L <rohitfor...@gmail.com> wrote:
>>>>
>>>>> Hi Everyone,
>>>>>
>>>>>   I want to know the real world uses cases for which Spark is used
>>>>> and hence can you please share for what purpose you are using Apache Spark
>>>>> in your project?
>>>>>
>>>>> --
>>>>> Rohit
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Sonal
>>>> Founder, Nube Technologies <http://www.nubetech.co>
>>>> Reifier at Strata Hadoop World
>>>> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
>>>> Reifier at Spark Summit 2015
>>>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>>>
>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>>
>>
>


Re: Apache design patterns

2016-06-09 Thread Daniel Siegmann
On Tue, Jun 7, 2016 at 11:43 PM, Francois Le Roux 
wrote:

> 1.   Should I use dataframes to ‘pull the source data? If so, do I do
> a groupby and order by as part of the SQL query?
>
Seems reasonable. If you use Scala you might want to define a case class
and convert the data frame to a dataset (I find the API of the latter
easier to work with), either before or after you group.


> 2.   How do I then split the grouped data (i.e. driver ID key value
> pairs) to then be parallelized for concurrent processing (i.e. ideally the
> number of parallel datasets/grouped data should run at max node cluster
> capacity)? DO I need to do some sort of mappartitioning ?
>
Spark partitions the data. Each partition can be processed in parallel (if
you look at the list of tasks for a stage in the UI, each task is a single
partition).

The problem you could run into is if you have many records for a given
driver, since they will all end up in the same partition. Look out for that
(you can see in the UI how much data is being processed by each task).

The number of partitions for stages which read the data depend on how the
data is stored (watch out for large gzipped files, as they cannot be
split). When data is shuffled (e.g. for a group by) it will be
repartitioned, with the number of partitions being determined by the
properties spark.default.parallelism and spark.sql.shuffle.partitions.

See
http://spark.apache.org/docs/latest/configuration.html#execution-behavior
and
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
.


> 3.   Pending (1) & (2) answers: How does each (i.e. grouped data set)
> dataframe or RDD or dataset perform these rules based checks (i.e. backward
> and forward looking checks) ? i.e. how is this achieved in SPARK?
>
Once you've grouped by driver ID, I assume you'll want to use a map
function. If you want to output multiple records (un-grouping after you've
processed them), you can use flatMap.

You might want to verify your grouped data is in row ID order. You need to
be careful about assuming things are ordered in a particular way in a
distributed system.


Re: Saving Parquet files to S3

2016-06-09 Thread Daniel Siegmann
I don't believe there's anyway to output files of a specific size. What you
can do is partition your data into a number of partitions such that the
amount of data they each contain is around 1 GB.

On Thu, Jun 9, 2016 at 7:51 AM, Ankur Jain  wrote:

> Hello Team,
>
>
>
> I want to write parquet files to AWS S3, but I want to size each file size
> to 1 GB.
>
> Can someone please guide me on how I can achieve the same?
>
>
>
> I am using AWS EMR with spark 1.6.1.
>
>
>
> Thanks,
>
> Ankur
> Information transmitted by this e-mail is proprietary to YASH Technologies
> and/ or its Customers and is intended for use only by the individual or
> entity to which it is addressed, and may contain information that is
> privileged, confidential or exempt from disclosure under applicable law. If
> you are not the intended recipient or it appears that this mail has been
> forwarded to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly prohibited. In
> such cases, please notify us immediately at i...@yash.com and delete this
> mail from your records.
>


Re: [ML] Training with bias

2016-04-12 Thread Daniel Siegmann
Yes, that's what I was looking for. Thanks.

On Tue, Apr 12, 2016 at 9:28 AM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Are you referring to fitting the intercept term? You can use
> lr.setFitIntercept (though it is true by default):
>
> scala> lr.explainParam(lr.fitIntercept)
> res27: String = fitIntercept: whether to fit an intercept term (default:
> true)
>
> On Mon, 11 Apr 2016 at 21:59 Daniel Siegmann <daniel.siegm...@teamaol.com>
> wrote:
>
>> I'm trying to understand how I can add a bias when training in Spark. I
>> have only a vague familiarity with this subject, so I hope this question
>> will be clear enough.
>>
>> Using liblinear a bias can be set - if it's >= 0, there will be an
>> additional weight appended in the model, and predicting with that model
>> will automatically append a feature for the bias.
>>
>> Is there anything similar in Spark, such as for logistic regression? The
>> closest thing I can find is MLUtils.appendBias, but this seems to
>> require manual work on both the training and scoring side. I was hoping for
>> something that would just be part of the model.
>>
>>
>> ~Daniel Siegmann
>>
>


[ML] Training with bias

2016-04-11 Thread Daniel Siegmann
I'm trying to understand how I can add a bias when training in Spark. I
have only a vague familiarity with this subject, so I hope this question
will be clear enough.

Using liblinear a bias can be set - if it's >= 0, there will be an
additional weight appended in the model, and predicting with that model
will automatically append a feature for the bias.

Is there anything similar in Spark, such as for logistic regression? The
closest thing I can find is MLUtils.appendBias, but this seems to require
manual work on both the training and scoring side. I was hoping for
something that would just be part of the model.

~Daniel Siegmann


Re: cluster randomly re-starting jobs

2016-03-21 Thread Daniel Siegmann
Never used Ambari and I don't know if this is your problem, but I have seen
similar behavior. In my case, my application failed and Hadoop kicked off a
second attempt. I didn't realize this, but when I refreshed the Spark UI,
suddenly everything seemed reset! This is because the application ID is
part of the URL, but not the attempt ID, so when the context for the second
attempt starts it will be at the same URL as the context for the first job.

To verify if this is the problem you could look at the application in the
Hadoop console (or whatever the equivalent is on Ambari) and see if there
are multiple attempts. You can also see it in the Spark history server
(under incomplete applications, if the second attempt is still running).

~Daniel Siegmann

On Mon, Mar 21, 2016 at 9:58 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you provide a bit more information ?
>
> Release of Spark and YARN
>
> Have you checked Spark UI / YARN job log to see if there is some clue ?
>
> Cheers
>
> On Mon, Mar 21, 2016 at 6:21 AM, Roberto Pagliari <
> roberto.pagli...@asos.com> wrote:
>
>> I noticed that sometimes the spark cluster seems to restart the job
>> completely.
>>
>> In the Ambari UI (where I can check jobs/stages) everything that was done
>> up to a certain point is removed, and the job is restarted.
>>
>> Does anyone know what the issue could be?
>>
>> Thank you,
>>
>>
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Daniel Siegmann
Thanks for the pointer to those indexers, those are some good examples. A
good way to go for the trainer and any scoring done in Spark. I will
definitely have to deal with scoring in non-Spark systems though.

I think I will need to scale up beyond what single-node liblinear can
practically provide. The system will need to handle much larger sub-samples
of this data (and other projects might be larger still). Additionally, the
system needs to train many models in parallel (hyper-parameter optimization
with n-fold cross-validation, multiple algorithms, different sets of
features).

Still, I suppose we'll have to consider whether Spark is the best system
for this. For now though, my job is to see what can be achieved with Spark.



On Fri, Mar 11, 2016 at 12:45 PM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Ok, I think I understand things better now.
>
> For Spark's current implementation, you would need to map those features
> as you mention. You could also use say StringIndexer -> OneHotEncoder or
> VectorIndexer. You could create a Pipeline to deal with the mapping and
> training (e.g.
> http://spark.apache.org/docs/latest/ml-guide.html#example-pipeline).
> Pipeline supports persistence.
>
> But it depends on your scoring use case too - a Spark pipeline can be
> saved and then reloaded, but you need all of Spark dependencies in your
> serving app which is often not ideal. If you're doing bulk scoring offline,
> then it may suit.
>
> Honestly though, for that data size I'd certainly go with something like
> Liblinear :) Spark will ultimately scale better with # training examples
> for very large scale problems. However there are definitely limitations on
> model dimension and sparse weight vectors currently. There are potential
> solutions to these but they haven't been implemented as yet.
>
> On Fri, 11 Mar 2016 at 18:35 Daniel Siegmann <daniel.siegm...@teamaol.com>
> wrote:
>
>> On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath <nick.pentre...@gmail.com
>> > wrote:
>>
>>> Would you mind letting us know the # training examples in the datasets?
>>> Also, what do your features look like? Are they text, categorical etc? You
>>> mention that most rows only have a few features, and all rows together have
>>> a few 10,000s features, yet your max feature value is 20 million. How are
>>> your constructing your feature vectors to get a 20 million size? The only
>>> realistic way I can see this situation occurring in practice is with
>>> feature hashing (HashingTF).
>>>
>>
>> The sub-sample I'm currently training on is about 50K rows, so ... small.
>>
>> The features causing this issue are numeric (int) IDs for ... lets call
>> it "Thing". For each Thing in the record, we set the feature Thing.id to
>> a value of 1.0 in our vector (which is of course a SparseVector). I'm
>> not sure how IDs are generated for Things, but they can be large numbers.
>>
>> The largest Thing ID is around 20 million, so that ends up being the size
>> of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
>> this data. The mean number of features per record in what I'm currently
>> training against is 41, while the maximum for any given record was 1754.
>>
>> It is possible to map the features into a small set (just need to
>> zipWithIndex), but this is undesirable because of the added complexity (not
>> just for the training, but also anything wanting to score against the
>> model). It might be a little easier if this could be encapsulated within
>> the model object itself (perhaps via composition), though I'm not sure how
>> feasible that is.
>>
>> But I'd rather not bother with dimensionality reduction at all - since we
>> can train using liblinear in just a few minutes, it doesn't seem necessary.
>>
>>
>>>
>>> MultivariateOnlineSummarizer uses dense arrays, but it should be
>>> possible to enable sparse data. Though in theory, the result will tend to
>>> be dense anyway, unless you have very many entries in the input feature
>>> vector that never occur and are actually zero throughout the data set
>>> (which it seems is the case with your data?). So I doubt whether using
>>> sparse vectors for the summarizer would improve performance in general.
>>>
>>
>> Yes, that is exactly my case - the vast majority of entries in the input
>> feature vector will *never* occur. Presumably that means most of the
>> values in the aggregators' arrays will be zero.
>>
>>
>>>
>>> LR doesn't accept a sparse weight vector, as it uses dense vectors for
>>> coefficients and gradients currently. When using L1 regularization, it
>>> could support sparse weight vectors, but the current implementation doesn't
>>> do that yet.
>>>
>>
>> Good to know it is theoretically possible to implement. I'll have to give
>> it some thought. In the meantime I guess I'll experiment with coalescing
>> the data to minimize the communication overhead.
>>
>> Thanks again.
>>
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Daniel Siegmann
On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath 
wrote:

> Would you mind letting us know the # training examples in the datasets?
> Also, what do your features look like? Are they text, categorical etc? You
> mention that most rows only have a few features, and all rows together have
> a few 10,000s features, yet your max feature value is 20 million. How are
> your constructing your feature vectors to get a 20 million size? The only
> realistic way I can see this situation occurring in practice is with
> feature hashing (HashingTF).
>

The sub-sample I'm currently training on is about 50K rows, so ... small.

The features causing this issue are numeric (int) IDs for ... lets call it
"Thing". For each Thing in the record, we set the feature Thing.id to a
value of 1.0 in our vector (which is of course a SparseVector). I'm not
sure how IDs are generated for Things, but they can be large numbers.

The largest Thing ID is around 20 million, so that ends up being the size
of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
this data. The mean number of features per record in what I'm currently
training against is 41, while the maximum for any given record was 1754.

It is possible to map the features into a small set (just need to
zipWithIndex), but this is undesirable because of the added complexity (not
just for the training, but also anything wanting to score against the
model). It might be a little easier if this could be encapsulated within
the model object itself (perhaps via composition), though I'm not sure how
feasible that is.

But I'd rather not bother with dimensionality reduction at all - since we
can train using liblinear in just a few minutes, it doesn't seem necessary.


>
> MultivariateOnlineSummarizer uses dense arrays, but it should be possible
> to enable sparse data. Though in theory, the result will tend to be dense
> anyway, unless you have very many entries in the input feature vector that
> never occur and are actually zero throughout the data set (which it seems
> is the case with your data?). So I doubt whether using sparse vectors for
> the summarizer would improve performance in general.
>

Yes, that is exactly my case - the vast majority of entries in the input
feature vector will *never* occur. Presumably that means most of the values
in the aggregators' arrays will be zero.


>
> LR doesn't accept a sparse weight vector, as it uses dense vectors for
> coefficients and gradients currently. When using L1 regularization, it
> could support sparse weight vectors, but the current implementation doesn't
> do that yet.
>

Good to know it is theoretically possible to implement. I'll have to give
it some thought. In the meantime I guess I'll experiment with coalescing
the data to minimize the communication overhead.

Thanks again.


Re: Spark ML - Scaling logistic regression for many features

2016-03-10 Thread Daniel Siegmann
Hi Nick,

Thanks for the feedback and the pointers. I tried coalescing to fewer
partitions and improved the situation dramatically. As you suggested, it is
communication overhead dominating the overall runtime.

The training run I mentioned originally had 900 partitions. Each tree
aggregation has two stages, one for the original partitions, and then one
with the aggregation into a smaller number (at 900 partitions the second
stage was 30). The first tree aggregation job (the longer one) uses the
MultivariateOnlineSummarizer you mentioned, while the subsequent
aggregation jobs use LogisticAggregator (similar problem, though smaller).

I've run some tests with fewer partitions on a very similar data set. 400
partitions took 8 hours, 100 partitions took 4 hours, and 10 partitions
took 1.4 hours. I put some screenshots from the Spark UI here:
http://imgur.com/a/trRJU

Still, these numbers seem oddly high. With 10 partitions it's shuffling
only some 200 MB per job, but the median "Getting Result Time" is 2.1
minutes. I would expected it to take *seconds* to transfer that data.

Anyway, the MultivariateOnlineSummarizer creates several arrays of doubles
equal to the size of the vector - arrays of course are inherently dense.
While this is only one iteration it is the longest, taking a significant
portion of the time by itself. LogisticAggregator meanwhile has fewer
arrays, but if you try to pass coefficients as anything other than a dense
vector it actually throws an error! Any idea why? Anyone know a reason
these aggregators *must* store their data densely, or is just an
implementation choice? Perhaps refactoring these classes to store data
sparsely would fix the issue.

On Wed, Mar 9, 2016 at 7:57 AM, Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> Hi Daniel
>
> The bottleneck in Spark ML is most likely (a) the fact that the weight
> vector itself is dense, and (b) the related communication via the driver. A
> tree aggregation mechanism is used for computing gradient sums (see
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala#L1080
>  and
> https://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html),
> which helps efficiency, but ultimately the driver must collect that
> gradient vector and re-broadcast the updated weight vector on every
> iteration.
>
> From a quick glance, MultivariateOnlineSummarizer doesn't seem optimized
> to sparse data either (though that is only one pass of the dataset so
> doubtful it adds too much overhead).
>
> It would be helpful to understand some further details:
> 1. Some more exact timing numbers - if you could provide screenshots /
> output from the UI to indicate the stages and call sites where the time is
> being spent that would be really useful
> 2. Is this part of a pipeline, and if so, what is the contribution of
> other parts of that pipeline to overall runtime?
> 3. Some stats on input / output data sizes from the critical stages (again
> from the UI)
> 4. The dataset size (# examples, avg sparsity % per example, etc)
> 5. Related to (4), the number of partitions of your dataset
> 6. Cluster details (# nodes and spec), as well as Spark version
>
> If you have a lot of partitions, you could find performance will be better
> with fewer partitions because the communication overhead will tend to
> dominate the overall runtime.
>
> Still, 10 hours and >100GB of driver memory seems extreme for a 20 million
> size dense weight vector (which should only be a few 100MB memory), so
> perhaps something else is going on.
>
> Nick
>
> On Tue, 8 Mar 2016 at 22:55 Daniel Siegmann <daniel.siegm...@teamaol.com>
> wrote:
>
>> Just for the heck of it I tried the old MLlib implementation, but it had
>> the same scalability problem.
>>
>> Anyone familiar with the logistic regression implementation who could
>> weigh in?
>>
>> On Mon, Mar 7, 2016 at 5:35 PM, Michał Zieliński <
>> zielinski.mich...@gmail.com> wrote:
>>
>>> We're using SparseVector columns in a DataFrame, so they are definitely
>>> supported. But maybe for LR some implicit magic is happening inside.
>>>
>>> On 7 March 2016 at 23:04, Devin Jones <devin.jo...@columbia.edu> wrote:
>>>
>>>> I could be wrong but its possible that toDF populates a dataframe which
>>>> I understand do not support sparsevectors at the moment.
>>>>
>>>> If you use the MlLib logistic regression implementation (not ml) you
>>>> can pass the RDD[LabeledPoint] data type directly to the learner.
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classificatio

Re: Spark ML - Scaling logistic regression for many features

2016-03-08 Thread Daniel Siegmann
Just for the heck of it I tried the old MLlib implementation, but it had
the same scalability problem.

Anyone familiar with the logistic regression implementation who could weigh
in?

On Mon, Mar 7, 2016 at 5:35 PM, Michał Zieliński <
zielinski.mich...@gmail.com> wrote:

> We're using SparseVector columns in a DataFrame, so they are definitely
> supported. But maybe for LR some implicit magic is happening inside.
>
> On 7 March 2016 at 23:04, Devin Jones <devin.jo...@columbia.edu> wrote:
>
>> I could be wrong but its possible that toDF populates a dataframe which I
>> understand do not support sparsevectors at the moment.
>>
>> If you use the MlLib logistic regression implementation (not ml) you can
>> pass the RDD[LabeledPoint] data type directly to the learner.
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>>
>> Only downside is that you can't use the pipeline framework from spark ml.
>>
>> Cheers,
>> Devin
>>
>>
>>
>> On Mon, Mar 7, 2016 at 4:54 PM, Daniel Siegmann <
>> daniel.siegm...@teamaol.com> wrote:
>>
>>> Yes, it is a SparseVector. Most rows only have a few features, and all
>>> the rows together only have tens of thousands of features, but the vector
>>> size is ~ 20 million because that is the largest feature.
>>>
>>> On Mon, Mar 7, 2016 at 4:31 PM, Devin Jones <devin.jo...@columbia.edu>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Which data structure are you using to train the model? If you haven't
>>>> tried yet, you should consider the SparseVector
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector
>>>>
>>>>
>>>> On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann <
>>>> daniel.siegm...@teamaol.com> wrote:
>>>>
>>>>> I recently tried to a model using
>>>>> org.apache.spark.ml.classification.LogisticRegression on a data set
>>>>> where the feature vector size was around ~20 million. It did *not* go
>>>>> well. It took around 10 hours to train on a substantial cluster.
>>>>> Additionally, it pulled a lot data back to the driver - I eventually set 
>>>>> --conf
>>>>> spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
>>>>> submitting.
>>>>>
>>>>> Attempting the same application on the same cluster with the feature
>>>>> vector size reduced to 100k took only ~ 9 minutes. Clearly there is an
>>>>> issue with scaling to large numbers of features. I'm not doing anything
>>>>> fancy in my app, here's the relevant code:
>>>>>
>>>>> val lr = new LogisticRegression().setRegParam(1)
>>>>> val model = lr.fit(trainingSet.toDF())
>>>>>
>>>>> In comparison, a coworker trained a logistic regression model on her
>>>>> *laptop* using the Java library liblinear in just a few minutes.
>>>>> That's with the ~20 million-sized feature vectors. This suggests to me
>>>>> there is some issue with Spark ML's implementation of logistic regression
>>>>> which is limiting its scalability.
>>>>>
>>>>> Note that my feature vectors are *very* sparse. The maximum feature
>>>>> is around 20 million, but I think there are only 10's of thousands of
>>>>> features.
>>>>>
>>>>> Has anyone run into this? Any idea where the bottleneck is or how this
>>>>> problem might be solved?
>>>>>
>>>>> One solution of course is to implement some dimensionality reduction.
>>>>> I'd really like to avoid this, as it's just another thing to deal with -
>>>>> not so hard to put it into the trainer, but then anything doing scoring
>>>>> will need the same logic. Unless Spark ML supports this out of the box? An
>>>>> easy way to save / load a model along with the dimensionality reduction
>>>>> logic so when transform is called on the model it will handle the
>>>>> dimensionality reduction transparently?
>>>>>
>>>>> Any advice would be appreciated.
>>>>>
>>>>> ~Daniel Siegmann
>>>>>
>>>>
>>>>
>>>
>>
>


Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Daniel Siegmann
I recently tried to a model using
org.apache.spark.ml.classification.LogisticRegression on a data set where
the feature vector size was around ~20 million. It did *not* go well. It
took around 10 hours to train on a substantial cluster. Additionally, it
pulled a lot data back to the driver - I eventually set --conf
spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
submitting.

Attempting the same application on the same cluster with the feature vector
size reduced to 100k took only ~ 9 minutes. Clearly there is an issue with
scaling to large numbers of features. I'm not doing anything fancy in my
app, here's the relevant code:

val lr = new LogisticRegression().setRegParam(1)
val model = lr.fit(trainingSet.toDF())

In comparison, a coworker trained a logistic regression model on her
*laptop* using the Java library liblinear in just a few minutes. That's
with the ~20 million-sized feature vectors. This suggests to me there is
some issue with Spark ML's implementation of logistic regression which is
limiting its scalability.

Note that my feature vectors are *very* sparse. The maximum feature is
around 20 million, but I think there are only 10's of thousands of features.

Has anyone run into this? Any idea where the bottleneck is or how this
problem might be solved?

One solution of course is to implement some dimensionality reduction. I'd
really like to avoid this, as it's just another thing to deal with - not so
hard to put it into the trainer, but then anything doing scoring will need
the same logic. Unless Spark ML supports this out of the box? An easy way
to save / load a model along with the dimensionality reduction logic so
when transform is called on the model it will handle the dimensionality
reduction transparently?

Any advice would be appreciated.

~Daniel Siegmann


Re: Serializing collections in Datasets

2016-03-03 Thread Daniel Siegmann
I have confirmed this is fixed in Spark 1.6.1 RC 1. Thanks.

On Tue, Feb 23, 2016 at 1:32 PM, Daniel Siegmann <
daniel.siegm...@teamaol.com> wrote:

> Yes, I will test once 1.6.1 RC1 is released. Thanks.
>
> On Mon, Feb 22, 2016 at 6:24 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I think this will be fixed in 1.6.1.  Can you test when we post the first
>> RC? (hopefully later today)
>>
>> On Mon, Feb 22, 2016 at 1:51 PM, Daniel Siegmann <
>> daniel.siegm...@teamaol.com> wrote:
>>
>>> Experimenting with datasets in Spark 1.6.0 I ran into a serialization
>>> error when using case classes containing a Seq member. There is no
>>> problem when using Array instead. Nor is there a problem using RDD or
>>> DataFrame (even if converting the DF to a DS later).
>>>
>>> Here's an example you can test in the Spark shell:
>>>
>>> import sqlContext.implicits._
>>>
>>> case class SeqThing(id: String, stuff: Seq[Int])
>>> val seqThings = Seq(SeqThing("A", Seq()))
>>> val seqData = sc.parallelize(seqThings)
>>>
>>> case class ArrayThing(id: String, stuff: Array[Int])
>>> val arrayThings = Seq(ArrayThing("A", Array()))
>>> val arrayData = sc.parallelize(arrayThings)
>>>
>>>
>>> // Array works fine
>>> arrayData.collect()
>>> arrayData.toDF.as[ArrayThing]
>>> arrayData.toDS
>>>
>>> // Seq can't convert directly to DS
>>> seqData.collect()
>>> seqData.toDF.as[SeqThing]
>>> seqData.toDS // Serialization exception
>>>
>>> Is this working as intended? Are there plans to support serializing
>>> arbitrary Seq values in datasets, or must everything be converted to
>>> Array?
>>>
>>> ~Daniel Siegmann
>>>
>>
>>
>


Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-02 Thread Daniel Siegmann
In the past I have seen this happen when I filled up HDFS and some core
nodes became unhealthy. There was no longer anywhere to replicate the data.
>From your command it looks like you should have 1 master and 2 core nodes
in your cluster. Can you verify both the core nodes are healthy?

On Wed, Mar 2, 2016 at 6:01 AM, Oleg Ruchovets  wrote:

> Here is my command:
>aws emr create-cluster --release-label emr-4.3.0 --name "ClusterJava8"
> --use-default-roles   --applications  Name=Ganglia Name=Hive Name=Hue
> Name=Mahout Name=Pig  Name=Spark  --ec2-attributes KeyName=CC-ES-Demo
>  --instance-count 3 --instance-type m3.xlarge  --use-default-roles
> --bootstrap-action Path=s3://crayon-emr-scripts/emr_java_8.sh
>
> I am using bootstrap script to install java 8.
>
> When I choose applications (Name=Ganglia Name=Hive Name=Hue Name=Mahout
> Name=Pig  Name=Spark) problem is gone. I fixed on the way Lzo not found
> exception. Now I have another problem that I have no idea why it happens:
> I tries to copy file to hdfs and got this exception (file is very small ,
> just couple of kb).
>
>
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /input/test.txt._COPYING_ could only be replicated to 0 nodes instead of
> minReplication (=1).  There are 0 datanode(s) running and no node(s) are
> excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:238)
> at com.sun.proxy.$Proxy9.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1441)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
> put: File /input/test.txt._COPYING_ could only be replicated to 0 nodes
> instead of minReplication (=1).  There are 0 datanode(s) running and no
> node(s) are excluded in this operation.
>
>
> On Wed, Mar 2, 2016 at 4:09 AM, Gourav Sengupta  > wrote:
>
>> Hi,
>>
>> which region are you using the EMR clusters from? Is there any tweaking
>> of the HADOOP parameters that you are doing before starting the clusters?
>>
>> If you are using AWS CLI to start the cluster just send across the
>> command.
>>
>> I have, never till date, faced any such issues in the Ireland region.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Mar 1, 2016 at 9:15 AM, Oleg Ruchovets 
>> wrote:
>>
>>> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell
>>> but it looks it does't work and throws exceptions.
>>> Please advice:
>>>
>>> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
>>> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
>>> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
>>> support was removed in 8.0
>>> 

Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-01 Thread Daniel Siegmann
How many core nodes does your cluster have?

On Tue, Mar 1, 2016 at 4:15 AM, Oleg Ruchovets  wrote:

> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell but
> it looks it does't work and throws exceptions.
> Please advice:
>
> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
> support was removed in 8.0
> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
> 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
> server' on port 47223.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>   /_/
>
> Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0
> 16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver'
> on port 52143.
> 16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started
> 16/03/01 09:11:54 INFO Remoting: Starting remoting
> 16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989]
> 16/03/01 09:11:54 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 42989.
> 16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker
> 16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster
> 16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at
> /mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc
> 16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity
> 518.1 MB
> 16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 16/03/01 09:11:55 INFO SparkUI: Started SparkUI at
> http://172.31.39.37:4040
> 16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at /
> 172.31.39.37:8032
> 16/03/01 09:11:55 INFO Client: Requesting a new application from cluster
> with 2 NodeManagers
> 16/03/01 09:11:55 INFO Client: Verifying our application has not requested
> more than the maximum memory capability of the cluster (11520 MB per
> container)
> 16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB
> memory including 384 MB overhead
> 16/03/01 09:11:55 INFO Client: Setting up container launch context for our
> AM
> 16/03/01 09:11:55 INFO Client: Setting up the launch environment for our
> AM container
> 16/03/01 09:11:55 INFO Client: Preparing resources for our AM container
> 16/03/01 09:11:56 INFO Client: Uploading resource
> file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar ->
> hdfs://
> 172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> 16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord
> disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec:
> 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500
> lastModified: 1456818856695
> 16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver
> j-2FT6QNFSPTHNX:i-5f6bcadb:SparkSubmit:04807 period:60
> /mnt/var/em/raw/i-5f6bcadb_20160301_SparkSubmit_04807_raw.bin
> 16/03/01 09:11:56 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> could only be replicated to 0 nodes instead of minReplication (=1).  There
> are 0 datanode(s) running and no node(s) are excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
> at
> 

Re: Serializing collections in Datasets

2016-02-23 Thread Daniel Siegmann
Yes, I will test once 1.6.1 RC1 is released. Thanks.

On Mon, Feb 22, 2016 at 6:24 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think this will be fixed in 1.6.1.  Can you test when we post the first
> RC? (hopefully later today)
>
> On Mon, Feb 22, 2016 at 1:51 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> Experimenting with datasets in Spark 1.6.0 I ran into a serialization
>> error when using case classes containing a Seq member. There is no
>> problem when using Array instead. Nor is there a problem using RDD or
>> DataFrame (even if converting the DF to a DS later).
>>
>> Here's an example you can test in the Spark shell:
>>
>> import sqlContext.implicits._
>>
>> case class SeqThing(id: String, stuff: Seq[Int])
>> val seqThings = Seq(SeqThing("A", Seq()))
>> val seqData = sc.parallelize(seqThings)
>>
>> case class ArrayThing(id: String, stuff: Array[Int])
>> val arrayThings = Seq(ArrayThing("A", Array()))
>> val arrayData = sc.parallelize(arrayThings)
>>
>>
>> // Array works fine
>> arrayData.collect()
>> arrayData.toDF.as[ArrayThing]
>> arrayData.toDS
>>
>> // Seq can't convert directly to DS
>> seqData.collect()
>> seqData.toDF.as[SeqThing]
>> seqData.toDS // Serialization exception
>>
>> Is this working as intended? Are there plans to support serializing
>> arbitrary Seq values in datasets, or must everything be converted to
>> Array?
>>
>> ~Daniel Siegmann
>>
>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Daniel Siegmann
During testing you will typically be using some finite data. You want the
stream to shut down automatically when that data has been consumed so your
test shuts down gracefully.

Of course once the code is running in production you'll want it to keep
waiting for new records. So whether the stream shuts down when there's no
more data should be configurable.



On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
wrote:

> Just out of curiosity I will like to know why a streaming program should
> shutdown when no new data is arriving?  I think it should keep waiting for
> arrival of new records.
>
> Thanks
> Ashutosh
>
> On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
> wrote:
>
>> A guess - parseRecord is returning None in some case (probaly empty
>> lines). And then entry.get is throwing the exception.
>>
>> You may want to filter the None values from accessLogDStream before you
>> run the map function over it.
>>
>> Hemant
>>
>> Hemant Bhanawat 
>> www.snappydata.io
>>
>> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:
>>
>>> Which line is line 42 in your code ?
>>>
>>> When variable lines becomes empty, you can stop your program.
>>>
>>> Cheers
>>>
>>> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>>>
>>> I am working on Spark Streaming API and I wish to stream a set of
>>> pre-downloaded web log files continuously to simulate a real-time stream. I
>>> wrote a script that gunzips the compressed logs and pipes the output to nc
>>> on port .
>>>
>>> The script looks like this:
>>>
>>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>>> zipped_files=`find $BASEDIR -name "*.gz"`
>>>
>>> for zfile in $zipped_files
>>>  do
>>>   echo "Unzipping $zfile..."
>>>   gunzip -c $zfile  | nc -l -p  -q 20
>>>
>>>  done
>>>
>>> I have streaming code written in Scala that processes the streams. It
>>> works well for the most part, but when its run out of files to stream I get
>>> the following error in Spark:
>>>
>>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>>> to only 0 peer(s) instead of 1 peers
>>> 
>>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>>> 47)
>>> java.util.NoSuchElementException: None.get
>>> at scala.None$.get(Option.scala:313)
>>> at scala.None$.get(Option.scala:311)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>>
>>> How to I implement a graceful shutdown so that the program exits
>>> gracefully when it no longer detects any data in the stream ?
>>>
>>> My Spark Streaming code looks like this:
>>>
>>> object StreamingLogEnhanced {
>>>  def main(args: Array[String]) {
>>>   val master = args(0)
>>>   val conf = new
>>>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>>  // Create a StreamingContext with a n second batch size
>>>   val ssc = new StreamingContext(conf, Seconds(10))
>>>  // Create a DStream from all the input on port 
>>>   val log = Logger.getLogger(getClass.getName)
>>>
>>>   sys.ShutdownHookThread {
>>>   log.info("Gracefully stopping Spark Streaming Application")
>>>   ssc.stop(true, true)
>>>   log.info("Application stopped")
>>>   }
>>>   val lines = ssc.socketTextStream("localhost", )
>>>   // Create a count of log hits by ip
>>>   var ipCounts=countByIp(lines)
>>>   ipCounts.print()
>>>
>>>   // start our streaming context and wait for it to "finish"
>>>   ssc.start()
>>>   // Wait for 600 seconds then exit
>>>   ssc.awaitTermination(1*600)
>>>   ssc.stop()
>>>   }
>>>
>>>  def countByIp(lines: DStream[String]) = {
>>>val parser = new AccessLogParser
>>>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>>val ipDStream = accessLogDStream.map(entry =>
>>> (entry.get.clientIpAddress, 1))
>>>ipDStream.reduceByKey((x, y) => x + y)
>>>  }
>>>
>>> }
>>>
>>> Thanks for any suggestions in advance.
>>>
>>>
>>
>


Serializing collections in Datasets

2016-02-22 Thread Daniel Siegmann
Experimenting with datasets in Spark 1.6.0 I ran into a serialization error
when using case classes containing a Seq member. There is no problem when
using Array instead. Nor is there a problem using RDD or DataFrame (even if
converting the DF to a DS later).

Here's an example you can test in the Spark shell:

import sqlContext.implicits._

case class SeqThing(id: String, stuff: Seq[Int])
val seqThings = Seq(SeqThing("A", Seq()))
val seqData = sc.parallelize(seqThings)

case class ArrayThing(id: String, stuff: Array[Int])
val arrayThings = Seq(ArrayThing("A", Array()))
val arrayData = sc.parallelize(arrayThings)


// Array works fine
arrayData.collect()
arrayData.toDF.as[ArrayThing]
arrayData.toDS

// Seq can't convert directly to DS
seqData.collect()
seqData.toDF.as[SeqThing]
seqData.toDS // Serialization exception

Is this working as intended? Are there plans to support serializing
arbitrary Seq values in datasets, or must everything be converted to Array?

~Daniel Siegmann


Re: Is this likely to cause any problems?

2016-02-19 Thread Daniel Siegmann
With EMR supporting Spark, I don't see much reason to use the spark-ec2
script unless it is important for you to be able to launch clusters using
the bleeding edge version of Spark. EMR does seem to do a pretty decent job
of keeping up to date - the latest version (4.3.0) supports the latest
Spark version (1.6.0).

So I'd flip the question around and ask: is there any reason to continue
using the spark-ec2 script rather than EMR?

On Thu, Feb 18, 2016 at 11:39 AM, James Hammerton  wrote:

> I have now... So far  I think the issues I've had are not related to this,
> but I wanted to be sure in case it should be something that needs to be
> patched. I've had some jobs run successfully but this warning appears in
> the logs.
>
> Regards,
>
> James
>
> On 18 February 2016 at 12:23, Ted Yu  wrote:
>
>> Have you seen this ?
>>
>> HADOOP-10988
>>
>> Cheers
>>
>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>
>>> HI,
>>>
>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>
>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>> disabled stack guard. The VM will try to fix the stack guard now.
>>> It's highly recommended that you fix the library with 'execstack -c 
>>> ', or link it with '-z noexecstack'.
>>>
>>>
>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>> is m4.large.
>>>
>>> Could this contribute to any problems running the jobs?
>>>
>>> Regards,
>>>
>>> James
>>>
>>
>>
>


Re: Spark 2.0.0 release plan

2016-01-27 Thread Daniel Siegmann
Will there continue to be monthly releases on the 1.6.x branch during the
additional time for bug fixes and such?

On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers  wrote:

> thanks thats all i needed
>
> On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen  wrote:
>
>> I think it will come significantly later -- or else we'd be at code
>> freeze for 2.x in a few days. I haven't heard anyone discuss this
>> officially but had batted around May or so instead informally in
>> conversation. Does anyone have a particularly strong opinion on that?
>> That's basically an extra 3 month period.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
>>
>> On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
>> wrote:
>> > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
>> > quarterly release as usual?
>> > Thanks
>>
>
>


Re: Too many tasks killed the scheduler

2016-01-12 Thread Daniel Siegmann
As I understand it, your initial number of partitions will always depend on
the initial data. I'm not aware of any way to change this, other than
changing the configuration of the underlying data store.

Have you tried reading the data in several data frames (e.g. one data frame
per day), coalescing each data frame, and *then* unioning them? You could
try with and without a shuffle. Not sure if it'll work, but might be worth
a shot.

On Mon, Jan 11, 2016 at 8:39 PM, Gavin Yue  wrote:

> Thank you for the suggestion.
>
> I tried the df.coalesce(1000).write.parquet() and yes, the parquet file
> number drops to 1000, but the parition of parquet stills is like 5000+.
> When I read the parquet and do a count, it still has the 5000+ tasks.
>
> So I guess I need to do a repartition here to drop task number?  But
> repartition never works for me, always failed due to out of memory.
>
> And regarding the large number task delay problem, I found a similar
> problem: https://issues.apache.org/jira/browse/SPARK-7447.
>
> I am unionALL like 10 parquet folder, with totally 70K+ parquet files,
> generating 70k+ taskes. It took around 5-8 mins before all tasks start just
> like the ticket abover.
>
> It also happens if I do a partition discovery with base path.Is there
> any schema inference or checking doing, which causes the slowness?
>
> Thanks,
> Gavin
>
>
>
> On Mon, Jan 11, 2016 at 1:21 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you use "coalesce" to reduce the number of partitions?
>>
>>
>> Shixiong Zhu
>>
>>
>> On Mon, Jan 11, 2016 at 12:21 AM, Gavin Yue 
>> wrote:
>>
>>> Here is more info.
>>>
>>> The job stuck at:
>>> INFO cluster.YarnScheduler: Adding task set 1.0 with 79212 tasks
>>>
>>> Then got the error:
>>> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
>>> after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
>>>
>>> So I increased spark.network.timeout from 120s to 600s.  It sometimes
>>> works.
>>>
>>> Each task is a parquet file.  I could not repartition due to out of GC
>>> problem.
>>>
>>> Is there any way I could to improve the performance?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>> On Sun, Jan 10, 2016 at 1:51 AM, Gavin Yue 
>>> wrote:
>>>
 Hey,

 I have 10 days data, each day has a parquet directory with over 7000
 partitions.
 So when I union 10 days and do a count, then it submits over 70K tasks.

 Then the job failed silently with one container exit with code 1.  The
 union with like 5, 6 days data is fine.
 In the spark-shell, it just hang showing: Yarn scheduler submit 7+
 tasks.

 I am running spark 1.6 over hadoop 2.7.  Is there any setting I could
 change to make this work?

 Thanks,
 Gavin



>>>
>>
>


Zip data frames

2015-12-29 Thread Daniel Siegmann
RDD has methods to zip with another RDD or with an index, but there's no
equivalent for data frames. Anyone know a good way to do this?

I thought I could just convert to RDD, do the zip, and then convert back,
but ...

   1. I don't see a way (outside developer API) to convert RDD[Row]
   directly back to DataFrame. Is there really no way to do this?
   2. I don't see any way to modify Row objects or create new rows with
   additional columns. In other words, no way to convert RDD[(Row, Row)] to
   RDD[Row]

It seems the only way to get what I want is to extract out the data into a
case class and then convert back to a data frame. Did I miss something?


Re: DataFrame Vs RDDs ... Which one to use When ?

2015-12-28 Thread Daniel Siegmann
DataFrames are a higher level API for working with tabular data - RDDs are
used underneath. You can use either and easily convert between them in your
code as necessary.

DataFrames provide a nice abstraction for many cases, so it may be easier
to code against them. Though if you're used to thinking in terms of
collections rather than tables, you may find RDDs more natural. Data frames
can also be faster, since Spark will do some optimizations under the hood -
if you are using PySpark, this will avoid the overhead. Data frames may
also perform better if you're reading structured data, such as a Hive table
or Parquet files.

I recommend you prefer data frames, switching over to RDDs as necessary
(when you need to perform an operation not supported by data frames / Spark
SQL).

HOWEVER (and this is a big one), Spark 1.6 will have yet another API -
datasets. The release of Spark 1.6 is currently being finalized and I would
expect it in the next few days. You will probably want to use the new API
once it's available.


On Sun, Dec 27, 2015 at 9:18 PM, Divya Gehlot 
wrote:

> Hi,
> I am new bee to spark and a bit confused about RDDs and DataFames in Spark.
> Can somebody explain me with the use cases which one to use when ?
>
> Would really appreciate the clarification .
>
> Thanks,
> Divya
>


Re: is repartition very cost

2015-12-09 Thread Daniel Siegmann
Each node can have any number of partitions. Spark will try to have a node
process partitions which are already on the node for best performance (if
you look at the list of tasks in the UI, look under the locality level
column).

As a rule of thumb, you probably want 2-3 times the number of partitions as
you have executors. This helps distribute the work evenly. You would need
to experiment to find the best number for your own case.

If you're reading from a distributed data store (such as HDFS), you should
expect the data to already be partitioned. Any time a shuffle is performed
the data will be repartitioned into a number of partitions equal to the
spark.default.parallelism setting (see
http://spark.apache.org/docs/latest/configuration.html), but most
operations which cause a shuffle also take an optional parameter to set a
different value. If using data frames, use spark.sql.shuffle.partitions.

I recommend you do not do any explicit partitioning or mess with these
values until you find a need for it. If executors are sitting idle, that's
a sign you may need to repartition.


On Tue, Dec 8, 2015 at 9:35 PM, Zhiliang Zhu 
wrote:

> Thanks very much for Yong's help.
>
> Sorry that for one more issue, is it that different partitions must be in
> different nodes? that is, each node would only have one partition, in
> cluster mode ...
>
>
>
> On Wednesday, December 9, 2015 6:41 AM, "Young, Matthew T" <
> matthew.t.yo...@intel.com> wrote:
>
>
> Shuffling large amounts of data over the network is expensive, yes. The
> cost is lower if you are just using a single node where no networking needs
> to be involved to do the repartition (using Spark as a multithreading
> engine).
>
> In general you need to do performance testing to see if a repartition is
> worth the shuffle time.
>
> A common model is to repartition the data once after ingest to achieve
> parallelism and avoid shuffles whenever possible later.
>
> *From:* Zhiliang Zhu [mailto:zchl.j...@yahoo.com.INVALID]
> *Sent:* Tuesday, December 08, 2015 5:05 AM
> *To:* User 
> *Subject:* is repartition very cost
>
>
> Hi All,
>
> I need to do optimize objective function with some linear constraints by
>  genetic algorithm.
> I would like to make as much parallelism for it by spark.
>
> repartition / shuffle may be used sometimes in it, however, is repartition
> API very cost ?
>
> Thanks in advance!
> Zhiliang
>
>
>
>
>


Re: Unit tests of spark application

2015-07-10 Thread Daniel Siegmann
On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire vmadh...@umail.iu.edu
wrote:

 I want to write junit test cases in scala for testing spark application.
 Is there any guide or link which I can refer.


https://spark.apache.org/docs/latest/programming-guide.html#unit-testing

Typically I create test data using SparkContext.parallelize and then call
RDD.collect to get the results to assert.


Re: Getting started with spark-scala developemnt in eclipse.

2015-07-08 Thread Daniel Siegmann
To set up Eclipse for Spark you should install the Scala IDE plugins:
http://scala-ide.org/download/current.html

Define your project in Maven with Scala plugins configured (you should be
able to find documentation online) and import as an existing Maven project.
The source code should be in src/main/scala but otherwise the project
structure will be the same as you'd expect in Java.

Nothing special is needed for Spark. Just define the desired Spark jars (
spark-core and possibly others, such as spark-sql) in your Maven POM as
dependencies. You should scope these dependencies as provided, since they
will automatically be on the classpath when you deploy your project to a
Spark cluster.

One thing to keep in mind is that Scala dependencies require separate jars
for different versions of Scala, and it is convention to append the Scala
version to the artifact ID. For example, if you are using Scala 2.11.x,
your dependency will be spark-core_2.11 (look on search.maven.org if you're
not sure). I think you can omit the Scala version if you're using SBT (not
sure why you would, but some people seem to prefer it).

Unit testing Spark is briefly explained in the programming guide
https://spark.apache.org/docs/latest/programming-guide.html#unit-testing.

To deploy using spark-submit you can build the jar using mvn package if and
only if you don't have any non-Spark dependencies. Otherwise, the simplest
thing is to build a jar with dependencies (typically using the assembly
http://maven.apache.org/plugins/maven-assembly-plugin/single-mojo.html or
shade https://maven.apache.org/plugins/maven-shade-plugin/ plugins).





On Wed, Jul 8, 2015 at 9:38 AM, Prateek . prat...@aricent.com wrote:

  Hi



 I am beginner to scala and spark. I am trying to set up eclipse
 environment to develop spark program  in scala, then take it’s  jar  for
 spark-submit.

 How shall I start? To start my  task includes, setting up eclipse for
 scala and spark, getting dependencies resolved, building project using
 maven/sbt.

 Is there any good blog or documentation that is can follow.



 Thanks

  DISCLAIMER: This message is proprietary to Aricent and is intended
 solely for the use of the individual to whom it is addressed. It may
 contain privileged or confidential information and should not be circulated
 or used for any purpose other than for what it is intended. If you have
 received this message in error, please notify the originator immediately.
 If you are not the intended recipient, you are notified that you are
 strictly prohibited from using, copying, altering, or disclosing the
 contents of this message. Aricent accepts no responsibility for loss or
 damage arising from the use of the information transmitted by this email
 including damage from virus.



Re: Want to avoid groupByKey as its running for ever

2015-06-30 Thread Daniel Siegmann
If the number of items is very large, have you considered using
probabilistic counting? The HyperLogLogPlus
https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java
class from stream-lib https://github.com/addthis/stream-lib might be
suitable.

On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 I have a RDD of type (String,
  
 Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
 com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]

 Here String is Key and a list of tuples for that key. I got above RDD
 after doing a groupByKey. I later want to compute total number of values
 for a given key and total number of unique values for the same given key
 and hence i do this

 val totalViCount = details.size.toLong
 val uniqueViCount =
 details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong

 How do i do this using reduceByKey.

 *Total Code:*

   val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
 DataRecord)])] = detailInputsToGroup.map {
 case (detailInput, dataRecord) =
   val key: StringBuilder = new StringBuilder
   dimensions.foreach {
 dimension =
   key ++= {

 Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse()).toString
   }
   }
   (key.toString, (detailInput, dataRecord))
   }.groupByKey

   groupedDetail.map {
 case (key, values) = {
   val valueList = values.toList

   //Compute dimensions // You can skup this
   val (detailInput, dataRecord) = valueList.head
   val schema = SchemaUtil.outputSchema(_detail)
   val detailOutput = new DetailOutputRecord(detail, new
 SessionRecord(schema))
   DataUtil.populateDimensions(schema, dimensions.toArray,
 detailInput, dataRecord, detailOutput)


   val metricsData = metricProviders.flatMap {
 case (className, instance) =
   val data = instance.getMetrics(valueList)
   ReflectionUtil.getData(data,
 _metricProviderMemberNames(className))
   }
   metricsData.map { case (k, v) = detailOutput.put(k, v) }
   val wrap = new AvroKey[DetailOutputRecord](detailOutput)
   (wrap, NullWritable.get)
 }
   }


 //getMetrics:
   def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
 val totalViCount = details.size.toLong
 val uniqueViCount =
 details.map(_._1.get(itemId).asInstanceOf[Long]).distinct.size.toLong
 new ViewItemCountMetric(totalViCount, uniqueViCount)
   }


 I understand that totalViCount can be implemented using reduceByKey. How
 can i implement total unique count as i need to have the full list to know
 the unique values.

 --
 Deepak




Re: Unit testing with HiveContext

2015-04-09 Thread Daniel Siegmann
Thanks Ted, using HiveTest as my context worked. It still left a metastore
directory and Derby log in my current working directory though; I manually
added a shutdown hook to delete them and all was well.

On Wed, Apr 8, 2015 at 4:33 PM, Ted Yu yuzhih...@gmail.com wrote:

 Please take a look at
 sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala :

   protected def configure(): Unit = {
 warehousePath.delete()
 metastorePath.delete()
 setConf(javax.jdo.option.ConnectionURL,
   sjdbc:derby:;databaseName=$metastorePath;create=true)
 setConf(hive.metastore.warehouse.dir, warehousePath.toString)
   }

 Cheers

 On Wed, Apr 8, 2015 at 1:07 PM, Daniel Siegmann 
 daniel.siegm...@teamaol.com wrote:

 I am trying to unit test some code which takes an existing HiveContext
 and uses it to execute a CREATE TABLE query (among other things).
 Unfortunately I've run into some hurdles trying to unit test this, and I'm
 wondering if anyone has a good approach.

 The metastore DB is automatically created in the local directory, but it
 doesn't seem to be cleaned up afterward. Is there any way to get Spark to
 clean this up when the context is stopped? Or can I point this to some
 other location, such as a temp directory?

 Trying to create a table fails because it is using the default warehouse
 directory (/user/hive/warehouse). Is there some way to change this without
 hard-coding a directory in a hive-site.xml; again, I'd prefer to point it
 to a temp directory so it will be automatically removed. I tried a couple
 of things that didn't work:

- hiveContext.sql(SET hive.metastore.warehouse.dir=/tmp/dir/xyz)
- hiveContext.setConf(hive.metastore.warehouse.dir, /tmp/dir/xyz)

 Any advice from those who have been here before would be appreciated.





Unit testing with HiveContext

2015-04-08 Thread Daniel Siegmann
I am trying to unit test some code which takes an existing HiveContext and
uses it to execute a CREATE TABLE query (among other things). Unfortunately
I've run into some hurdles trying to unit test this, and I'm wondering if
anyone has a good approach.

The metastore DB is automatically created in the local directory, but it
doesn't seem to be cleaned up afterward. Is there any way to get Spark to
clean this up when the context is stopped? Or can I point this to some
other location, such as a temp directory?

Trying to create a table fails because it is using the default warehouse
directory (/user/hive/warehouse). Is there some way to change this without
hard-coding a directory in a hive-site.xml; again, I'd prefer to point it
to a temp directory so it will be automatically removed. I tried a couple
of things that didn't work:

   - hiveContext.sql(SET hive.metastore.warehouse.dir=/tmp/dir/xyz)
   - hiveContext.setConf(hive.metastore.warehouse.dir, /tmp/dir/xyz)

Any advice from those who have been here before would be appreciated.


Re: Setup Spark jobserver for Spark SQL

2015-04-02 Thread Daniel Siegmann
You shouldn't need to do anything special. Are you using a named context?
I'm not sure those work with SparkSqlJob.

By the way, there is a forum on Google groups for the Spark Job Server:
https://groups.google.com/forum/#!forum/spark-jobserver

On Thu, Apr 2, 2015 at 5:10 AM, Harika matha.har...@gmail.com wrote:

 Hi,

 I am trying to Spark Jobserver(
 https://github.com/spark-jobserver/spark-jobserver
 https://github.com/spark-jobserver/spark-jobserver  ) for running Spark
 SQL jobs.

 I was able to start the server but when I run my application(my Scala class
 which extends SparkSqlJob), I am getting the following as response:

 {
   status: ERROR,
   result: Invalid job type for this context
 }

 Can any one suggest me what is going wrong or provide a detailed procedure
 for setting up jobserver for SparkSQL?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Spark-jobserver-for-Spark-SQL-tp22352.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Partitioning Dataset and Using Reduce in Apache Spark

2015-03-13 Thread Daniel Siegmann
On Thu, Mar 12, 2015 at 1:45 AM, raghav0110...@gmail.com wrote:


 In your response you say “When you call reduce and *similar *methods,
 each partition can be reduced in parallel. Then the results of that can be
 transferred across the network and reduced to the final result”. By similar
 methods do you mean all actions within spark? Does transfer of data from
 worker nodes to driver nodes happen only when an action is performed?


There is reduceByKey and some others that are in a more generalized form.
Other transformations and actions may work somewhat differently, but they
will generally be parallelized as much as is possible.

If you look at the UI when the job is running, you will see some number of
tasks. Each task corresponds to a single partition.

Not all actions cause data to be transferred from worker nodes to the
driver (there is only one node) - saveAsTextFile, for example. In any case,
no processing is done and no data is transferred anywhere until an action
is invoked, since transformations are lazy.


 I am assuming that in Spark, you typically have a set of transformations
 followed by some sort of action. The RDD is partitioned and sent to
 different worker nodes(assuming this a cluster setup), the transformations
 are applied to the RDD partitions at the various worker nodes, and then
 when an action is performed, you perform the action on the worker nodes and
 then aggregate the partial results at the driver and then perform another
 reduction at the driver to obtain the overall results. I would also assume
 that deciding whether the action should be done on a worker node, depends
 on the type of action. For example, performing reduce at the worker node
 makes sense, while it doesn't make sense to save the file at the worker
 node.  Does that sound correct, or am I misinterpreting something?


On the contrary, saving of files would typically be done at the worker
node. If you are handling anything approaching Big Data you will be
keeping it in a distributed store (typically HDFS, though it depends on
your use case), and each worker will write into this store. For example, if
you saveAsTextFile you will see multiple part-* files in the output
directory from separate partitions (don't worry about combining them for
future processing, sc.textFile can read them all in as a single RDD with
the data appropriately partitioned).

Some actions do pull data back to the driver - collect, for example. You
need to be careful when using such methods that you can be sure the amount
of data won't be too large for your driver.

In general you should avoid pulling any data back to the driver or doing
any processing on the driver as that will not scale. In some cases it may
be useful; for example if you wanted to join a large data set with a small
data set, it might perform better to collect the small data set and
broadcast it.

Take this for example:

sc.textFile(inputPath).map(...).filter(...).saveAsTextFile(outputPath)

None of that will execute on the driver. The map and filter will be
collapsed into a single stage (which you will in the UI). Each worker will
prefer to read its local data (but data may be transferred if there's none
locally to work on), transform the data, and write it out.

If you had this for example:

sc.textFile(inputPath).map(...).reduceByKey(...).saveAsTextFile(outputPath)


You will have two stages, since reduceByKey causes a shuffle - data will be
transferred across the network and formed into a new set of partitions. But
after the reduce, each worker will still save the data it is responsible
for. (You can provide a custom partitioner, but don't do that unless you
feel you have a good reason to do so.)

Hope that helps.


Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?

2015-03-12 Thread Daniel Siegmann
Join causes a shuffle (sending data across the network). I expect it will
be better to filter before you join, so you reduce the amount of data which
is sent across the network.

Note this would be true for *any* transformation which causes a shuffle. It
would not be true if you're combining RDDs with union, since that doesn't
cause a shuffle.

On Thu, Mar 12, 2015 at 11:04 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Probably this question is already answered sometime in the mailing list,
 but i couldn't find it. Sorry for posting this again.

 I need to to join and apply filtering on three different RDDs, I just
 wonder which of the following alternatives are more efficient:
 1- first joint all three RDDs and then do  filtering on resulting joint
 RDD   or
 2- Apply filtering on each individual RDD and then join the resulting RDDs


 Or probably there is no difference due to lazy evaluation and under
 beneath Spark optimisation?

 best,
 /Shahab



Re: Partitioning Dataset and Using Reduce in Apache Spark

2015-03-05 Thread Daniel Siegmann
An RDD is a Resilient *Distributed* Data set. The partitioning and
distribution of the data happens in the background. You'll occasionally
need to concern yourself with it (especially to get good performance), but
from an API perspective it's mostly invisible (some methods do allow you to
specify a number of partitions).

When you call sc.textFile(myPath) or similar, you get an RDD. That RDD will
be composed of a bunch of partitions, but you don't really need to worry
about that. The partitioning will be based on how the data is stored. When
you call a method that causes a shuffle (such as reduce), the data is
repartitioned into a number of partitions based on your default parallelism
setting (which IIRC is based on your number of cores if you haven't set it
explicitly).

When you call reduce and similar methods, each partition can be reduced in
parallel. Then the results of that can be transferred across the network
and reduced to the final result. *You supply the function and Spark handles
the parallel execution of that function*.

I hope this helps clear up your misconceptions. You might also want to
familiarize yourself with the collections API in Java 8 (or Scala, or
Python, or pretty much any other language with lambda expressions), since
RDDs are meant to have an API that feels similar.

On Thu, Mar 5, 2015 at 9:45 AM, raggy raghav0110...@gmail.com wrote:

 I am trying to use Apache spark to load up a file, and distribute the file
 to
 several nodes in my cluster and then aggregate the results and obtain them.
 I don't quite understand how to do this.

 From my understanding the reduce action enables Spark to combine the
 results
 from different nodes and aggregate them together. Am I understanding this
 correctly?

 From a programming perspective, I don't understand how I would code this
 reduce function.

 How exactly do I partition the main dataset into N pieces and ask them to
 be
 parallel processed by using a list of transformations?

 reduce is supposed to take in two elements and a function for combining
 them. Are these 2 elements supposed to be RDDs from the context of Spark or
 can they be any type of element? Also, if you have N different partitions
 running parallel, how would reduce aggregate all their results into one
 final result(since the reduce function aggregates only 2 elements)?

 Also, I don't understand this example. The example from the spark website
 uses reduce, but I don't see the data being processed in parallel. So, what
 is the point of the reduce? If I could get a detailed explanation of the
 loop in this example, I think that would clear up most of my questions.

 class ComputeGradient extends FunctionDataPoint, Vector {
   private Vector w;
   ComputeGradient(Vector w) { this.w = w; }
   public Vector call(DataPoint p) {
 return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
   }
 }

 JavaRDDDataPoint points = spark.textFile(...).map(new
 ParsePoint()).cache();
 Vector w = Vector.random(D); // current separating plane
 for (int i = 0; i  ITERATIONS; i++) {
   Vector gradient = points.map(new ComputeGradient(w)).reduce(new
 AddVectors());
   w = w.subtract(gradient);
 }
 System.out.println(Final separating plane:  + w);

 Also, I have been trying to find the source code for reduce from the Apache
 Spark Github, but the source is pretty huge and I haven't been able to
 pinpoint it. Could someone please direct me towards which file I could find
 it in?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkSQL production readiness

2015-03-02 Thread Daniel Siegmann
OK, good to know data frames are still experimental. Thanks Michael.

On Mon, Mar 2, 2015 at 12:37 PM, Michael Armbrust mich...@databricks.com
wrote:

 We have been using Spark SQL in production for our customers at Databricks
 for almost a year now.  We also know of some very large production
 deployments elsewhere.  It is still a young project, but I wouldn't call it
 alpha.

 The primary changes to the API are the addition of the DataFrame
 interface, which is an expansion of the DSL that was already there.  All of
 the SQL / HiveQL stuff remains unchanged, as well as the internal execution
 engine.  DataFrames are still marked experimental, since as you said, we
 should let people use them before cementing them.



Re: Filtering keys after map+combine

2015-02-19 Thread Daniel Siegmann
I'm not sure what your use case is, but perhaps you could use mapPartitions
to reduce across the individual partitions and apply your filtering. Then
you can finish with a reduceByKey.

On Thu, Feb 19, 2015 at 9:21 AM, Debasish Das debasish.da...@gmail.com
wrote:

 Hi,

 Before I send out the keys for network shuffle, in reduceByKey after map +
 combine are done, I would like to filter the keys based on some threshold...

 Is there a way to get the key, value after map+combine stages so that I
 can run a filter on the keys ?

 Thanks.
 Deb




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Escape commas in file names

2014-12-26 Thread Daniel Siegmann
Thanks for the replies. Hopefully this will not be too difficult to fix.

Why not support multiple paths by overloading the parquetFile method to
take a collection of strings? That way we don't need an appropriate
delimiter.

On Thu, Dec 25, 2014 at 3:46 AM, Cheng, Hao hao.ch...@intel.com wrote:

  I’ve created a jira issue for this
 https://issues.apache.org/jira/browse/SPARK-4967



 Originally we want to support multiple parquet file paths scanning as I
 guess, and those file paths are in a single string separated by comma
 internally, however I didn’t find any public example says we support
 multiple parquet files for API sqlContext.parquetFile, we need to think how
 to support multiple paths in some other way.



 Cheng Hao



Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Siegmann
You want to use RDD.union (or SparkContext.union for many RDDs). These
don't join on a key. Union doesn't really do anything itself, so it is low
overhead. Note that the combined RDD will have all the partitions of the
original RDDs, so you may want to coalesce after the union.

val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
val z = x.union(y)

z.collect
res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))


On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith person.of.b...@gmail.com
wrote:

 Say I have two RDDs with the following values

 x = [(1, 3), (2, 4)]

 and

 y = [(3, 5), (4, 7)]

 and I want to have

 z = [(1, 3), (2, 4), (3, 5), (4, 7)]

 How can I achieve this. I know you can use outerJoin followed by map to
 achieve this, but is there a more direct way for this.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Siegmann
Harihar, your question is the opposite of what was asked. In the future,
please start a new thread for new questions.

You want to do a join in your case. The join function does an inner join,
which I think is what you want because you stated your IDs are common in
both RDDs. For other cases you can look at leftOuterJoin, rightOuterJoin,
and cogroup (alias groupWith). These are all on PairRDDFunctions (in Scala)
[1]

Alternatively, if one of your RDDs is small, you could collect it and
broadcast the collection, using it in functions on the other RDD. But I
don't think this will apply in your case because the number of records will
be equal in each RDD. [2]

[1]
http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
[2]
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

On Thu, Nov 20, 2014 at 4:53 PM, Harihar Nahak hna...@wynyardgroup.com
wrote:

 I've similar type of issue, want to join two different type of RDD in one
 RDD

 file1.txt content (ID, counts)
 val x : RDD[Long, Int] = sc.textFile(file1.txt).map( line =
 line.split(,)).map(row = (row(0).toLong, row(1).toInt)
 [(4407 ,40),
 (2064, 38),
 (7815 ,10),
 (5736,17),
 (8031,3)]

 Second RDD from : file2.txt contains (ID, name)
 val y: RDD[(Long, String)]{where ID is common in both the RDDs}
 [(4407 ,Jhon),
 (2064, Maria),
 (7815 ,Casto),
 (5736,Ram),
 (8031,XYZ)]

 and I'm expecting result should be like this : [(ID, Name, Count)]
 [(4407 ,Jhon, 40),
 (2064, Maria, 38),
 (7815 ,Casto, 10),
 (5736,Ram, 17),
 (8031,XYZ, 3)]


 Any help will really appreciate. Thanks




 On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] [hidden
 email] http://user/SendEmail.jtp?type=nodenode=19423i=0 wrote:

 You want to use RDD.union (or SparkContext.union for many RDDs). These
 don't join on a key. Union doesn't really do anything itself, so it is low
 overhead. Note that the combined RDD will have all the partitions of the
 original RDDs, so you may want to coalesce after the union.

 val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
 val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
 val z = x.union(y)

 z.collect
 res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))


 On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19419i=0 wrote:

 Say I have two RDDs with the following values

 x = [(1, 3), (2, 4)]

 and

 y = [(3, 5), (4, 7)]

 and I want to have

 z = [(1, 3), (2, 4), (3, 5), (4, 7)]

 How can I achieve this. I know you can use outerJoin followed by map to
 achieve this, but is there a more direct way for this.




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: [hidden email] http://user/SendEmail.jtp?type=nodenode=19419i=1
 W: www.velos.io


 --
  If you reply to this email, your message will be added to the
 discussion below:

 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html
  To start a new topic under Apache Spark User List, email [hidden email]
 http://user/SendEmail.jtp?type=nodenode=19423i=1
 To unsubscribe from Apache Spark User List, click here.
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml




 --
 Regards,
 Harihar Nahak
 BigData Developer
 Wynyard
 [hidden email] http://user/SendEmail.jtp?type=nodenode=19423i=2 |
 Extn: 8019
  --Harihar

 --
 View this message in context: Re: How to join two RDDs with mutually
 exclusive keys
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19423.html
 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


PairRDDFunctions with Tuple2 subclasses

2014-11-19 Thread Daniel Siegmann
I have a class which is a subclass of Tuple2, and I want to use it with
PairRDDFunctions. However, I seem to be limited by the invariance of T in
RDD[T] (see SPARK-1296 https://issues.apache.org/jira/browse/SPARK-1296).

My Scala-fu is weak: the only way I could think to make this work would be
to define my own equivalent of PairRDDFunctions which works with my class,
does type conversions to Tuple2, and delegates to PairRDDFunctions.

Does anyone know a better way? Anyone know if there will be a significant
performance penalty with that approach?

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: PairRDDFunctions with Tuple2 subclasses

2014-11-19 Thread Daniel Siegmann
Casting to Tuple2 is easy, but the output of reduceByKey is presumably a
new Tuple2 instance so I'll need to map those to new instances of my class.
Not sure how much overhead will be added by the creation of those new
instances.

If I do that everywhere in my code though, it will make the code really
messy. That is why I was thinking of creating a wrapper which looks like
PairRDDFunctions which would cast to a pair RDD, delegate to
PairRDDFunctions, and then convert back to my class.

I was kinda hoping a Scala wizard would come along with some black magic
though.

On Wed, Nov 19, 2014 at 7:45 PM, Michael Armbrust mich...@databricks.com
wrote:

 I think you should also be able to get away with casting it back and forth
 in this case using .asInstanceOf.

 On Wed, Nov 19, 2014 at 4:39 PM, Daniel Siegmann daniel.siegm...@velos.io
  wrote:

 I have a class which is a subclass of Tuple2, and I want to use it with
 PairRDDFunctions. However, I seem to be limited by the invariance of T
 in RDD[T] (see SPARK-1296
 https://issues.apache.org/jira/browse/SPARK-1296).

 My Scala-fu is weak: the only way I could think to make this work would
 be to define my own equivalent of PairRDDFunctions which works with my
 class, does type conversions to Tuple2, and delegates to PairRDDFunctions
 .

 Does anyone know a better way? Anyone know if there will be a significant
 performance penalty with that approach?

 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to assign consecutive numeric id to each row based on its content?

2014-11-18 Thread Daniel Siegmann
I think zipWithIndex is zero-based, so if you want 1 to N, you'll need to
increment them like so:

val r2 = r1.keys.distinct().zipWithIndex().mapValues(_ + 1)

If the number of distinct keys is relatively small, you might consider
collecting them into a map and broadcasting them rather than using a join,
like so:

val keyIndices = sc.broadcast(r2.collect.toMap)
val r3 = r1.map { case (k, v) = (keyIndices(k), v) }

On Tue, Nov 18, 2014 at 8:16 AM, Cheng Lian lian.cs@gmail.com wrote:

  A not so efficient way can be this:

 val r0: RDD[OriginalRow] = ...val r1 = r0.keyBy(row = 
 extractKeyFromOriginalRow(row))val r2 = r1.keys.distinct().zipWithIndex()val 
 r3 = r2.join(r1).values

 On 11/18/14 8:54 PM, shahab wrote:

   Hi,

  In my spark application, I am loading some rows from database into Spark
 RDDs
 Each row has several fields, and a string key. Due to my requirements I
 need to work with consecutive numeric ids (starting from 1 to N, where N is
 the number of unique keys) instead of string keys . Also several rows can
 have same string key .

  In spark context, how I can map each row into (Numeric_Key, OriginalRow)
 as map/reduce  tasks such that rows with same original string key get same
 numeric consecutive key?

  Any hints?

  best,
 /Shahab

   ​




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How do you force a Spark Application to run in multiple tasks

2014-11-17 Thread Daniel Siegmann
I've never used Mesos, sorry.

On Fri, Nov 14, 2014 at 5:30 PM, Steve Lewis lordjoe2...@gmail.com wrote:

 The cluster runs Mesos and I can see the tasks in the Mesos UI but most
 are not doing much - any hints about that UI

 On Fri, Nov 14, 2014 at 11:39 AM, Daniel Siegmann 
 daniel.siegm...@velos.io wrote:

 Most of the information you're asking for can be found on the Spark web
 UI (see here http://spark.apache.org/docs/1.1.0/monitoring.html). You
 can see which tasks are being processed by which nodes.

 If you're using HDFS and your file size is smaller than the HDFS block
 size you will only have one partition (remember, there is exactly one task
 for each partition in a stage). If you want to force it to have more
 partitions, you can call RDD.repartition(numPartitions). Note that this
 will introduce a shuffle you wouldn't otherwise have.

 Also make sure your job is allocated more than one core in your cluster
 (you can see this on the web UI).

 On Fri, Nov 14, 2014 at 2:18 PM, Steve Lewis lordjoe2...@gmail.com
 wrote:

  I have instrumented word count to track how many machines the code runs
 on. I use an accumulator to maintain a Set or MacAddresses. I find that
 everything is done on a single machine. This is probably optimal for word
 count but not the larger problems I am working on.
 How to a force processing to be split into multiple tasks. How to I
 access the task and attempt numbers to track which processing happens in
 which attempt. Also is using MacAddress to determine which machine is
 running the code.
 As far as I can tell a simple word count is running in one thread on
  one machine and the remainder of the cluster does nothing,
 This is consistent with tests where I write to sdout from functions and
 see little output on most machines in the cluster





 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io




 --
 Steven M. Lewis PhD
 4221 105th Ave NE
 Kirkland, WA 98033
 206-384-1340 (cell)
 Skype lordjoe_com




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: RDD.aggregate versus accumulables...

2014-11-17 Thread Daniel Siegmann
You should *never* use accumulators for this purpose because you may get
incorrect answers. Accumulators can count the same thing multiple times -
you cannot rely upon the correctness of the values they compute. See
SPARK-732 https://issues.apache.org/jira/browse/SPARK-732 for more info.

On Sun, Nov 16, 2014 at 10:06 PM, Segerlind, Nathan L 
nathan.l.segerl...@intel.com wrote:

  Hi All.



 I am trying to get my head around why using accumulators and accumulables
 seems to be the most recommended method for accumulating running sums,
 averages, variances and the like, whereas the aggregate method seems to me
 to be the right one. I have no performance measurements as of yet, but it
 seems that aggregate is simpler and more intuitive (And it does what one
 might expect an accumulator to do) whereas the accumulators and
 accumulables seem to have some extra complications and overhead.



 So…



 What’s the real difference between an accumulator/accumulable and
 aggregating an RDD? When is one method of aggregation preferred over the
 other?



 Thanks,

 Nate




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-17 Thread Daniel Siegmann
I'm not aware of any such mechanism.

On Mon, Nov 17, 2014 at 2:55 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Hi Daniel,

 Yes that should work also. However, is it possible to setup so that each
 RDD has exactly one partition, without repartitioning (and thus incurring
 extra cost)? Is there a mechanism similar to MR where we can ensure each
 partition is assigned some amount of data by size, by setting some block
 size parameter?



 On Thu, Nov 13, 2014 at 1:05 PM, Daniel Siegmann daniel.siegm...@velos.io
  wrote:

 On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote


 No i don't want separate RDD because each of these partitions are being
 processed the same way (in my case, each partition corresponds to HBase
 keys belonging to one region server, and i will do HBase lookups). After
 that i have aggregations too, hence all these partitions should be in the
 same RDD. The reason to follow the partition structure is to limit
 concurrent HBase lookups targeting a single region server.


 Neither of these is necessarily a barrier to using separate RDDs. You can
 define the function you want to use and then pass it to multiple map
 methods. Then you could union all the RDDs to do your aggregations. For
 example, it might look something like this:

 val paths: String = ... // the paths to the files you want to load
 def myFunc(t: T) = ... // the function to apply to every RDD
 val rdds = paths.map { path =
 sc.textFile(path).map(myFunc)
 }
 val completeRdd = sc.union(rdds)

 Does that make any sense?

 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How do you force a Spark Application to run in multiple tasks

2014-11-14 Thread Daniel Siegmann
Most of the information you're asking for can be found on the Spark web UI (see
here http://spark.apache.org/docs/1.1.0/monitoring.html). You can see
which tasks are being processed by which nodes.

If you're using HDFS and your file size is smaller than the HDFS block size
you will only have one partition (remember, there is exactly one task for
each partition in a stage). If you want to force it to have more
partitions, you can call RDD.repartition(numPartitions). Note that this
will introduce a shuffle you wouldn't otherwise have.

Also make sure your job is allocated more than one core in your cluster
(you can see this on the web UI).

On Fri, Nov 14, 2014 at 2:18 PM, Steve Lewis lordjoe2...@gmail.com wrote:

  I have instrumented word count to track how many machines the code runs
 on. I use an accumulator to maintain a Set or MacAddresses. I find that
 everything is done on a single machine. This is probably optimal for word
 count but not the larger problems I am working on.
 How to a force processing to be split into multiple tasks. How to I access
 the task and attempt numbers to track which processing happens in which
 attempt. Also is using MacAddress to determine which machine is running the
 code.
 As far as I can tell a simple word count is running in one thread on  one
 machine and the remainder of the cluster does nothing,
 This is consistent with tests where I write to sdout from functions and
 see little output on most machines in the cluster





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-13 Thread Daniel Siegmann
Would it make sense to read each file in as a separate RDD? This way you
would be guaranteed the data is partitioned as you expected.

Possibly you could then repartition each of those RDDs into a single
partition and then union them. I think that would achieve what you expect.
But it would be easy to accidentally screw this up (have some operation
that causes a shuffle), so I think you're better off just leaving them as
separate RDDs.

On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia 
mchett...@rocketfuelinc.com wrote:

 Hi,

 I have a set of input files for a spark program, with each file
 corresponding to a logical data partition. What is the API/mechanism to
 assign each input file (or a set of files) to a spark partition, when
 initializing RDDs?

 When i create a spark RDD pointing to the directory of files, my
 understanding is it's not guaranteed that each input file will be treated
 as separate partition.

 My job semantics require that the data is partitioned, and i want to
 leverage the partitioning that has already been done, rather than
 repartitioning again in the spark job.

 I tried to lookup online but haven't found any pointers so far.


 Thanks
 pala




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-13 Thread Daniel Siegmann
I believe Rishi is correct. I wouldn't rely on that though - all it would
take is for one file to exceed the block size and you'd be setting yourself
up for pain. Also, if your files are small - small enough to fit in a
single record - you could use SparkContext.wholeTextFile.

On Thu, Nov 13, 2014 at 10:11 AM, Rishi Yadav ri...@infoobjects.com wrote:

 If your data is in hdfs and you are reading as textFile and each file is
 less than block size, my understanding is it would always have one
 partition per file.


 On Thursday, November 13, 2014, Daniel Siegmann daniel.siegm...@velos.io
 wrote:

 Would it make sense to read each file in as a separate RDD? This way you
 would be guaranteed the data is partitioned as you expected.

 Possibly you could then repartition each of those RDDs into a single
 partition and then union them. I think that would achieve what you expect.
 But it would be easy to accidentally screw this up (have some operation
 that causes a shuffle), so I think you're better off just leaving them as
 separate RDDs.

 On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia 
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I have a set of input files for a spark program, with each file
 corresponding to a logical data partition. What is the API/mechanism to
 assign each input file (or a set of files) to a spark partition, when
 initializing RDDs?

 When i create a spark RDD pointing to the directory of files, my
 understanding is it's not guaranteed that each input file will be treated
 as separate partition.

 My job semantics require that the data is partitioned, and i want to
 leverage the partitioning that has already been done, rather than
 repartitioning again in the spark job.

 I tried to lookup online but haven't found any pointers so far.


 Thanks
 pala




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 54 W 40th St, New York, NY 10018
 E: daniel.siegm...@velos.io W: www.velos.io



 --
 - Rishi




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Accessing RDD within another RDD map

2014-11-13 Thread Daniel Siegmann
You cannot reference an RDD within a closure passed to another RDD. Your
code should instead look like this:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(11 to 20)
val rdd2Count = rdd2.count
rdd1.map{ i =
 rdd2Count
}
.foreach(println(_))

You should also note that even if your original code did work, you would be
re-counting rdd2 for every single record in rdd1. Unless your RDD is cached
/ persisted, the count will be recomputed every time it is called. So that
would be extremely inefficient.


On Thu, Nov 13, 2014 at 2:28 PM, Simone Franzini captainfr...@gmail.com
wrote:

 The following code fails with NullPointerException in RDD class on the
 count function:

 val rdd1 = sc.parallelize(1 to 10)
 val rdd2 = sc.parallelize(11 to 20)
 rdd1.map{ i =
  rdd2.count
 }
 .foreach(println(_))

 The same goes for any other action I am trying to perform inside the map
 statement. I am failing to understand what I am doing wrong.
 Can anyone help with this?

 Thanks,
 Simone Franzini, PhD

 http://www.linkedin.com/in/simonefranzini




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-13 Thread Daniel Siegmann
On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote


 No i don't want separate RDD because each of these partitions are being
 processed the same way (in my case, each partition corresponds to HBase
 keys belonging to one region server, and i will do HBase lookups). After
 that i have aggregations too, hence all these partitions should be in the
 same RDD. The reason to follow the partition structure is to limit
 concurrent HBase lookups targeting a single region server.


Neither of these is necessarily a barrier to using separate RDDs. You can
define the function you want to use and then pass it to multiple map
methods. Then you could union all the RDDs to do your aggregations. For
example, it might look something like this:

val paths: String = ... // the paths to the files you want to load
def myFunc(t: T) = ... // the function to apply to every RDD
val rdds = paths.map { path =
sc.textFile(path).map(myFunc)
}
val completeRdd = sc.union(rdds)

Does that make any sense?

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Is there a way to clone a JavaRDD without persisting it

2014-11-12 Thread Daniel Siegmann
As far as I know you basically have two options: let partitions be
recomputed (possibly caching / persisting memory only), or persist to disk
(and memory) and suffer the cost of writing to disk. The question is which
will be more expensive in your case. My experience is you're better off
letting things be recomputed to begin with and then trying out some
persisting.

It seems to me Spark would benefit from allowing actions to be batched up
and then having Spark execute them intelligently. That is, each partition
could be processed by multiple actions after being computed. But I don't
believe there's any way to achieve this currently.

If anyone does have a way to achieve this, I'd love to hear it. :-)

On Wed, Nov 12, 2014 at 1:23 AM, Steve Lewis lordjoe2...@gmail.com wrote:

  In my problem I have a number of intermediate JavaRDDs and would like to
 be able to look at their sizes without destroying the RDD for sibsequent
 processing. persist will do this but these are big and perisist seems
 expensive and I am unsure of which StorageLevel is needed, Is there a way
 to clone a JavaRDD or does anyong have good ideas on how to do this?




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Custom persist or cache of RDD?

2014-11-11 Thread Daniel Siegmann
But that requires an (unnecessary) load from disk.

I have run into this same issue, where we want to save intermediate results
but continue processing. The cache / persist feature of Spark doesn't seem
designed for this case. Unfortunately I'm not aware of a better solution
with the current version of Spark.

On Mon, Nov 10, 2014 at 5:15 PM, Sean Owen so...@cloudera.com wrote:

 Well you can always create C by loading B from disk, and likewise for
 E / D. No need for any custom procedure.

 On Mon, Nov 10, 2014 at 7:33 PM, Benyi Wang bewang.t...@gmail.com wrote:
  When I have a multi-step process flow like this:
 
  A - B - C - D - E - F
 
  I need to store B and D's results into parquet files
 
  B.saveAsParquetFile
  D.saveAsParquetFile
 
  If I don't cache/persist any step, spark might recompute from A,B,C,D
 and E
  if something is wrong in F.
 
  Of course, I'd better cache all steps if I have enough memory to avoid
 this
  re-computation, or persist result to disk. But persisting B and D seems
  duplicate with saving B and D as parquet files.
 
  I'm wondering if spark can restore B and D from the parquet files using a
  customized persist and restore procedure?
 
 
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: SparkContext.stop() ?

2014-10-31 Thread Daniel Siegmann
It is used to shut down the context when you're done with it, but if you're
using a context for the lifetime of your application I don't think it
matters.

I use this in my unit tests, because they start up local contexts and you
can't have multiple local contexts open so each test must stop its context
when it's done.

On Fri, Oct 31, 2014 at 11:12 AM, ll duy.huynh@gmail.com wrote:

 what is it for?  when do we call it?

 thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-stop-tp17826.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Play framework

2014-10-16 Thread Daniel Siegmann
We execute Spark jobs from a Play application but we don't use
spark-submit. I don't know if you really want to use spark-submit, but if
not you can just create a SparkContext programmatically in your app.

In development I typically run Spark locally. Creating the Spark context is
pretty trivial:

val conf = new SparkConf().setMaster(local[*]).setAppName(sMy Awesome
App)
// call conf.set for any other configuration you want
val sc = new SparkContext(sparkConf)

It is important to keep in mind you cannot have multiple local contexts
(you can create them but you'll get odd errors), so if you are running
things in parallel within your app (even unit tests) you'd need to share a
context in this case. If you are running sequentially you can create a new
local context each time, but you must make sure to call SparkContext.stop()
when you're done.

Running against a cluster is a bit more complicated because you need to add
all your dependency jars. I'm not sure how to get this to work with play run.
I stick to building the app with play dist and then running against the
packaged application, because it very conveniently provides all the
dependencies in a lib folder. Here is some code to load all the paths you
need from the dist:

def libs : Seq[String] = {
val libDir = play.api.Play.application.getFile(lib)

logger.info(sSparkContext will be initialized with libraries from
directory $libDir)

return if ( libDir.exists ) {

libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(.jar))
} else {
throw new IllegalStateException(slib dir is missing: $libDir)
}
}

Creating the context is similar to above, but with this extra line:

conf.setJars(libs)

I hope this helps. I should note that I don't use play run very much, at
least not for when I'm actually executing Spark jobs. So I'm not sure if
this integrates properly with that. I have unit tests which execute on
Spark and have executed the dist package both locally and on a cluster. To
make working with the dist locally easier, I wrote myself a little shell
script to unzip and run the dist.


On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller moham...@glassbeam.com
wrote:

  Hi –



 Has anybody figured out how to integrate a Play application with Spark and
 run it on a Spark cluster using spark-submit script? I have seen some blogs
 about creating a simple Play app and running it locally on a dev machine
 with sbt run command. However, those steps don’t work for Spark-submit.



 If you have figured out how to build and run a Play app with Spark-submit,
 I would appreciate if you could share the steps and the sbt settings for
 your Play app.



 Thanks,

 Mohammed






-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Unit testing: Mocking out Spark classes

2014-10-16 Thread Daniel Siegmann
Mocking these things is difficult; executing your unit tests in a local
Spark context is preferred, as recommended in the programming guide
http://spark.apache.org/docs/latest/programming-guide.html#unit-testing.
I know this may not technically be a unit test, but it is hopefully close
enough.

You can load your test data using SparkContext.parallelize and retrieve the
data (for verification) using RDD.collect.

On Thu, Oct 16, 2014 at 9:07 AM, Saket Kumar saket.ku...@bgch.co.uk wrote:

 Hello all,

 I am trying to unit test my classes involved my Spark job. I am trying to
 mock out the Spark classes (like SparkContext and Broadcast) so that I can
 unit test my classes in isolation. However I have realised that these are
 classes instead of traits. My first question is why?

 It is quite hard to mock out classes using ScalaTest+ScalaMock as the
 classes which need to be mocked out need to be annotated with
 org.scalamock.annotation.mock as per
 http://www.scalatest.org/user_guide/testing_with_mock_objects#generatedMocks.
 I cannot do that in my case as I am trying to mock out the spark classes.

 Am I missing something? Is there a better way to do this?

 val sparkContext = mock[SparkInteraction]
 val trainingDatasetLoader = mock[DatasetLoader]
 val broadcastTrainingDatasetLoader = mock[Broadcast[DatasetLoader]]
 def transformerFunction(source: Iterator[(HubClassificationData,
 String)]): Iterator[String] = {
   source.map(_._2)
 }
 val classificationResultsRDD = mock[RDD[String]]
 val classificationResults = Array(,,)
 val inputRDD = mock[RDD[(HubClassificationData, String)]]

 inSequence{
   inAnyOrder{
 (sparkContext.broadcast[DatasetLoader]
 _).expects(trainingDatasetLoader).returns(broadcastTrainingDatasetLoader)
   }
 }

 val sparkInvoker = new SparkJobInvoker(sparkContext,
 trainingDatasetLoader)

 when(inputRDD.mapPartitions(transformerFunction)).thenReturn(classificationResultsRDD)
 sparkInvoker.invoke(inputRDD)

 Thanks,
 Saket




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Spark inside Eclipse

2014-10-02 Thread Daniel Siegmann
You don't need to do anything special to run in local mode from within
Eclipse. Just create a simple SparkConf and create a SparkContext from
that. I have unit tests which execute on a local SparkContext, and they
work from inside Eclipse as well as SBT.

val conf = new SparkConf().setMaster(local).setAppName(sWhatever)
val sc = new SparkContext(sparkConf)

Keep in mind you can only have one local SparkContext at a time, otherwise
you will get some weird errors. If you have tests running sequentially,
make sure to close the SparkContext in your tear down method. If tests run
in parallel you'll need to share the SparkContext between tests.

For unit testing, you can make use of SparkContext.parallelize to set up
your test inputs and RDD.collect to retrieve the outputs.


On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain ashish@gmail.com wrote:

 Hello Sanjay,

 This can be done, and is a very effective way to debug.

 1) Compile and package your project to get a fat jar
 2) In your SparkConf use setJars and give location of this jar. Also set
 your master here as local in SparkConf
 3) Use this SparkConf when creating JavaSparkContext
 4) Debug your program like you would any normal program.

 Hope this helps.

 Thanks
 Ashish
 On Oct 1, 2014 4:35 PM, Sanjay Subramanian
 sanjaysubraman...@yahoo.com.invalid wrote:

 hey guys

 Is there a way to run Spark in local mode from within Eclipse.
 I am running Eclipse Kepler on a Macbook Pro with Mavericks
 Like one can run hadoop map/reduce applications from within Eclipse and
 debug and learn.

 thanks

 sanjay




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to get SparckContext inside mapPartitions?

2014-10-01 Thread Daniel Siegmann
I don't think you can get a SparkContext inside an RDD function (such as
mapPartitions), but you shouldn't need to. Have you considered returning
the data read from the database from mapPartitions to create a new RDD and
then just save it to a file like normal?

For example:

rddObject.mapPartitions(x = {
x.map(getDataFromDB(_))
}, true).saveAsTextFile(hdfs:///some-folder/)

Does that make sense?

On Wed, Oct 1, 2014 at 12:52 AM, Henry Hung ythu...@winbond.com wrote:

  Hi All,



 A noob question:

 How to get SparckContext inside mapPartitions?



 Example:



 Let’s say I have rddObjects that can be split into different partitions to
 be assigned to multiple executors, to speed up the export data from
 database.



 Variable sc is created in the main program using these steps:

 val conf = new SparkConf().setAppName(ETCH VM Get FDC)

 val sc = new SparkContext(conf)



 and here is the mapPartitions code:

 rddObject.mapPartitions(x = {

   val uuid =  java.util.UUID.randomUUID.toString

   val path = new org.apache.hadoop.fs.Path(“hdfs:///some-folder/” + uuid)

   val fs = path.getFileSystem(sc.hadoopConfiguration)

   val pw = new PrintWriter(fs.create(path))

   while (x.hasNext) {

 // … do something here, like fetch data from database and write it to
 hadoop file

 pw.println(getDataFromDB(x.next))

   }

 })



 My question is how can I use sc to get the hadoopConfiguration, thus
 enable me to create Hadoop file?



 Best regards,

 Henry

 --
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: about partition number

2014-09-29 Thread Daniel Siegmann
A task is the work to be done on a partition for a given stage - you
should expect the number of tasks to be equal to the number of partitions
in each stage, though a task might need to be rerun (due to failure or need
to recompute some data).

2-4 times the cores in your cluster should be a good starting place. Then
you can try different values and see how it affects your performance.

On Mon, Sep 29, 2014 at 5:01 PM, anny9699 anny9...@gmail.com wrote:

 Hi,

 I read the past posts about partition number, but am still a little
 confused
 about partitioning strategy.

 I have a cluster with 8 works and 2 cores for each work. Is it true that
 the
 optimal partition number should be 2-4 * total_coreNumber or should
 approximately equal to total_coreNumber? Or it's the task number that
 really
 determines the speed rather then partition number?

 Thanks a lot!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/about-partition-number-tp15362.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: mappartitions data size

2014-09-26 Thread Daniel Siegmann
Use RDD.repartition (see here:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
).

On Fri, Sep 26, 2014 at 10:19 AM, jamborta jambo...@gmail.com wrote:

 Hi all,

 I am using mappartitions to do some heavy computing on subsets of the data.
 I have a dataset with about 1m rows, running on a 32 core cluster.
 Unfortunately, is seems that mappartitions splits the data into two sets so
 it is only running on two cores.

 Is there a way to force it to split into smaller chunks?

 thanks,




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/mappartitions-data-size-tp15231.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to do operations on multiple RDD's

2014-09-26 Thread Daniel Siegmann
There are numerous ways to combine RDDs. In your case, it seems you have
several RDDs of the same type and you want to do an operation across all of
them as if they were a single RDD. The way to do this is SparkContext.union
or RDD.union, which have minimal overhead. The only difference between
these is the latter allows you to only union two at a time (but of course
you can just call reduce on your sequence to union them all).

Keep in mind this won't repartition anything, so if you find you have too
many partitions after the union you could use RDD.coalesce to merge them.

On Fri, Sep 26, 2014 at 11:55 AM, Johan Stenberg johanstenber...@gmail.com
wrote:

 Hi,

 This is my first post to the email list so give me some feedback if I do
 something wrong.

 To do operations on two RDD's to produce a new one you can just use
 zipPartitions, but if I have an arbitrary number of RDD's that I would like
 to perform an operation on to produce a single RDD, how do I do that? I've
 been reading the docs but haven't found anything.

 For example: if I have a Seq of RDD[Array[Int]]'s and I want to take the
 majority of each array cell. So if all RDD's have one array which are like
 this:

 [1, 2, 3]
 [0, 0, 0]
 [1, 2, 0]

 Then the resulting RDD would have the array [1, 2, 0]. How do I approach
 this problem? It becomes too heavy to have an accumulator variable I guess?
 Otherwise it could be an array of maps with values as keys and frequency as
 values.

 Essentially I want something like zipPartitions but for arbitrarily many
 RDD's, is there any such functionality or how would I approach this problem?

 Cheers,

 Johan




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Spark as a Library

2014-09-16 Thread Daniel Siegmann
You can create a new SparkContext inside your container pointed to your
master. However, for your script to run you must call addJars to put the
code on your workers' classpaths (except when running locally).

Hopefully your webapp has some lib folder which you can point to as a
source for the jars. In the Play Framework you can use
play.api.Play.application.getFile(lib) to get a path to the lib directory
and get the contents. Of course that only works on the packaged web app.

On Tue, Sep 16, 2014 at 3:17 PM, Ruebenacker, Oliver A 
oliver.ruebenac...@altisource.com wrote:



  Hello,



   Thanks for the response and great to hear it is possible. But how do I
 connect to Spark without using the submit script?



   I know how to start up a master and some workers and then connect to the
 master by packaging the app that contains the SparkContext and then
 submitting the package with the spark-submit script in standalone-mode. But
 I don’t want to submit the app that contains the SparkContext via the
 script, because I want that app to be running on a web server. So, what are
 other ways to connect to Spark? I can’t find in the docs anything other
 than using the script. Thanks!



  Best, Oliver



 *From:* Matei Zaharia [mailto:matei.zaha...@gmail.com]
 *Sent:* Tuesday, September 16, 2014 1:31 PM
 *To:* Ruebenacker, Oliver A; user@spark.apache.org
 *Subject:* Re: Spark as a Library



 If you want to run the computation on just one machine (using Spark's
 local mode), it can probably run in a container. Otherwise you can create a
 SparkContext there and connect it to a cluster outside. Note that I haven't
 tried this though, so the security policies of the container might be too
 restrictive. In that case you'd have to run the app outside and expose an
 RPC interface between them.



 Matei



 On September 16, 2014 at 8:17:08 AM, Ruebenacker, Oliver A (
 oliver.ruebenac...@altisource.com) wrote:



  Hello,



   Suppose I want to use Spark from an application that I already submit to
 run in another container (e.g. Tomcat). Is this at all possible? Or do I
 have to split the app into two components, and submit one to Spark and one
 to the other container? In that case, what is the preferred way for the two
 components to communicate with each other? Thanks!



  Best, Oliver



 Oliver Ruebenacker | Solutions Architect



 Altisource™

 290 Congress St, 7th Floor | Boston, Massachusetts 02210

 P: (617) 728-5582 | ext: 275585

 oliver.ruebenac...@altisource.com | www.Altisource.com



 ***


 This email message and any attachments are intended solely for the use of
 the addressee. If you are not the intended recipient, you are prohibited
 from reading, disclosing, reproducing, distributing, disseminating or
 otherwise using this transmission. If you have received this message in
 error, please promptly notify the sender by reply email and immediately
 delete this message from your system. This message and any attachments
 may contain information that is confidential, privileged or exempt from
 disclosure. Delivery of this message to any person other than the intended
 recipient is not intended to waive any right or privilege. Message
 transmission is not guaranteed to be secure or free of software viruses.

 ***


 ***

 This email message and any attachments are intended solely for the use of
 the addressee. If you are not the intended recipient, you are prohibited
 from reading, disclosing, reproducing, distributing, disseminating or
 otherwise using this transmission. If you have received this message in
 error, please promptly notify the sender by reply email and immediately
 delete this message from your system.
 This message and any attachments may contain information that is
 confidential, privileged or exempt from disclosure. Delivery of this
 message to any person other than the intended recipient is not intended to
 waive any right or privilege. Message transmission is not guaranteed to be
 secure or free of software viruses.

 ***




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Where to save intermediate results?

2014-09-02 Thread Daniel Siegmann
I don't have any personal experience with Spark Streaming. Whether you
store your data in HDFS or a database or something else probably depends on
the nature of your use case.


On Fri, Aug 29, 2014 at 10:38 AM, huylv huy.le...@insight-centre.org
wrote:

 Hi Daniel,

 Your suggestion is definitely an interesting approach. In fact, I already
 have another system to deal with the stream analytical processing part. So
 basically, the Spark job to aggregate data just accumulatively computes
 aggregations from historical data together with new batch, which has been
 partly summarized by the stream processor. Answering queries involves in
 combining pre-calculated historical data together with on-stream
 aggregations. This sounds much like what Spark Streaming is intended to do.
 So I'll take a look deeper into Spark Streaming to consider porting the
 stream processing part to use Spark Streaming.

 Regarding saving pre-calculated data onto external storages (disk,
 database...), I'm looking at Cassandra for now. But I don't know how it
 fits
 into my context and how is its performance compared to saving to files in
 HDFS. Also, is there anyway to keep the precalculated data both on disk and
 on memory, so that when the batch job terminated, historical data still
 available on memory for combining with stream processor, while still be
 able
 to survive system failure or upgrade? Not to mention the size of
 precalculated data might get too big, in that case, partly store newest
 data
 on memory only would be better. Tachyon looks like a nice option but again,
 I don't have experience with it and it's still an experimental feature of
 Spark.

 Regards,
 Huy



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062p13127.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Q on downloading spark for standalone cluster

2014-08-28 Thread Daniel Siegmann
If you aren't using Hadoop, I don't think it matters which you download.
I'd probably just grab the Hadoop 2 package.

Out of curiosity, what are you using as your data store? I get the
impression most Spark users are using HDFS or something built on top.


On Thu, Aug 28, 2014 at 4:07 PM, Sanjeev Sagar 
sanjeev.sa...@mypointscorp.com wrote:

 Hello there,

 I've a basic question on the downloadthat which option I need to
 downloadfor standalone cluster.

 I've a private cluster of three machineson Centos. When I click on
 download it shows me following:


Download Spark

 The latest release is Spark 1.0.2, released August 5, 2014 (release notes)
 http://spark.apache.org/releases/spark-release-1-0-2.html (git tag) 
 https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=
 8fb6f00e195fb258f3f70f04756e07c259a2351f

 Pre-built packages:

  * For Hadoop 1 (HDP1, CDH3): find an Apache mirror
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/
 spark-1.0.2-bin-hadoop1.tgz
or direct file download
http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop1.tgz
  * For CDH4: find an Apache mirror
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/
 spark-1.0.2-bin-cdh4.tgz
or direct file download
http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz
  * For Hadoop 2 (HDP2, CDH5): find an Apache mirror
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/
 spark-1.0.2-bin-hadoop2.tgz
or direct file download
http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop2.tgz

 Pre-built packages, third-party (NOTE: may include non ASF-compatible
 licenses):

  * For MapRv3: direct file download (external)
http://package.mapr.com/tools/apache-spark/1.0.2/
 spark-1.0.2-bin-mapr3.tgz
  * For MapRv4: direct file download (external)
http://package.mapr.com/tools/apache-spark/1.0.2/
 spark-1.0.2-bin-mapr4.tgz


 From the above it looks like that I've to donwload Hadoop or CDH4 first in
 order to use Spark ? I've a standalone cluster and my data size is also
 like hundreds of Gig or close to Terabyte.

 I don't get it that which one I need to download from the above list.

 Could some one assist me that which one I need to download for standalone
 cluster and for big data foot print ?

 or Hadoop is needed or mandatory for using Spark? that's not the
 understanding I've. My understanding is that you can use spark with Hadoop
 if you like from yarn2 but you could use spark standalone also without
 hadoop.

 Please assist. I'm confused !

 -Sanjeev


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Where to save intermediate results?

2014-08-28 Thread Daniel Siegmann
I assume your on-demand calculations are a streaming flow? If your data
aggregated from batch isn't too large, maybe you should just save it to
disk; when your streaming flow starts you can read the aggregations back
from disk and perhaps just broadcast them. Though I guess you'd have to
restart your streaming flow when these aggregations are updated.

For something more sophisticated, maybe look at Redis http://redis.io/ or
some distributed database? Your ETL can update that store, and your
on-demand job can query it.


On Thu, Aug 28, 2014 at 4:30 PM, huylv huy.le...@insight-centre.org wrote:

 Hi,

 I'm building a system for near real-time data analytics. My plan is to have
 an ETL batch job which calculates aggregations running periodically. User
 queries are then parsed for on-demand calculations, also in Spark. Where
 are
 the pre-calculated results supposed to be saved? I mean, after finishing
 aggregations, the ETL job will terminate, so caches are wiped out of
 memory.
 How can I use these results to calculate on-demand queries? Or more
 generally, could you please give me a good way to organize the data flow
 and
 jobs in order to achieve this?

 I'm new to Spark so sorry if this might sound like a dumb question.

 Thank you.
 Huy



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Development environment issues

2014-08-25 Thread Daniel Siegmann
On Thu, Aug 21, 2014 at 6:21 PM, pierred pie...@demartines.com wrote:

So, what is the accepted wisdom in terms of IDE and development environment?


I don't know what the accepted wisdom is. I've been getting by with the
Scala IDE for Eclipse, though I am using the stable version - as you noted,
this keeps me from upgrading to the latest Eclipse version. The quality of
the Scala IDE is poor, but I have found it generally usable. I generate the
Eclipse project files from SBT. Debugging does work (mostly) - just be
aware you can't easily step into a lambda, so it's easiest to add a
breakpoint inside of it.

As for unit testing, both Specs2 and ScalaTest work, and I can run
individual tests within Eclipse. For Specs2 there is an Eclipse plugin, and
for ScalaTest you can annotate your tests with
@RunWith(classOf[JUnitRunner]) and it'll work in the usual JUnit tools. I
have automated tests running in Bamboo. Took a bit of wrangling to get the
test output picked up, but it works.


 Is there a good tutorial to set things up so that one half of the
 libraries/tools doesn't break the other half?


No idea.


 What do you guys use?
 scala 2.10 or 2.11?
 sbt or maven?
 eclipse or idea?
 jdk7 or 8?


I'm using Java 7 and Scala 2.10.x (not every framework I use supports later
versions). SBT because I use the Play Framework, but I miss Maven. I
haven't tried IntelliJ's Scala support, but it's probably worth a shot.

The tooling isn't nearly as solid as what Java has, but I make due.

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: heterogeneous cluster hardware

2014-08-21 Thread Daniel Siegmann
If you use Spark standalone, you could start multiple workers on some
machines. Size your worker configuration to be appropriate for the weak
machines, and start multiple on your beefier machines.

It may take a bit of work to get that all hooked up - probably you'll want
to write some scripts to start everything on all your nodes correctly. But
hopefully it will work smoothly once the cluster is up and running.


On Thu, Aug 21, 2014 at 11:42 AM, anthonyjschu...@gmail.com 
anthonyjschu...@gmail.com wrote:

 Jörn, thanks for the post...

 Unfortunately, I am stuck with the hardware I have and might not be
 able to get budget allocated for a new stack of servers when I've
 already got so many ok servers on hand... And even more
 unfortunately, a large subset of these machines are... shall we say...
 extremely humble in their cpus and ram. My group has exclusive access
 to the machine and rarely do we need to run concurrent jobs-- What I
 really want is max capacity per-job. The applications are massive
 machine-learning experiments, so I'm not sure about the feasibility of
 breaking it up into concurrent jobs. At this point, I am seriously
 considering dropping down to Akka-level programming. Why, oh why,
 doesn't spark allow for allocating variable worker threads per host?
 this would seem to be the correct point of abstraction that would
 allow the construction of massive clusters using on-hand hardware?
 (the scheduler probably wouldn't have to change at all)

 On Thu, Aug 21, 2014 at 9:25 AM, Jörn Franke [via Apache Spark User
 List] [hidden email] http://user/SendEmail.jtp?type=nodenode=12587i=0
 wrote:

  Hi,
 
  Well, you could use Mesos or Yarn2 to define  resources per Job - you
 can
  give only as much resources (cores, memory etc.) per machine as your
 worst
  machine has. The rest is done by Mesos or Yarn. By doing this you avoid
 a
  per machine resource assignment without any disadvantages. You can run
  without any problems run other jobs in parallel and older machines won't
 get
  overloaded.
 
  however, you should take care that your cluster does not get too
  heterogeneous.
 
  Best regards,
  Jörn
 
  Le 21 août 2014 16:55, [hidden email] [hidden email] a écrit :
 
  I've got a stack of Dell Commodity servers-- Ram~(8 to 32Gb) single or
  dual
  quad core processor cores per machine. I think I will have them loaded
  with
  CentOS. Eventually, I may want to add GPUs on the nodes to handle
 linear
  alg. operations...
 
  My Idea has been:
 
  1) to find a way to configure Spark to allocate different resources
  per-machine, per-job. -- at least have a standard executor... and
 allow
  different machines to have different numbers of executors.
 
  2) make (using vanilla spark) a pre-run optimization phase which
  benchmarks
  the throughput of each node (per hardware), and repartition the dataset
 to
  more efficiently use the hardware rather than rely on Spark
 Speculation--
  which has always seemed a dis-optimal way to balance the load across
  several
  differing machines.
 
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567p12581.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: [hidden email]
  For additional commands, e-mail: [hidden email]
 
 
 
  
  If you reply to this email, your message will be added to the discussion
  below:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567p12585.html
  To unsubscribe from heterogeneous cluster hardware, click here.
  NAML



 --
 A  N  T  H  O  N  Y   Ⓙ   S  C  H  U  L  T  E

 --
 View this message in context: Re: heterogeneous cluster hardware
 http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567p12587.html

 Sent from the Apache Spark User List mailing list archive
 http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Ways to partition the RDD

2014-08-14 Thread Daniel Siegmann
First, I think you might have a misconception about partitioning. ALL RDDs
are partitioned (even if they are a single partition). When reading from
HDFS the number of partitions depends on how the data is stored in HDFS.
After data is shuffled (generally caused by things like reduceByKey), the
number of partitions will be whatever is set as the default parallelism
(see https://spark.apache.org/docs/latest/configuration.html). Most of
these methods allow you to specify a different number of partitions as a
parameter

The number of partitions == the number of tasks. So generally the number of
partitions you want will be enough to keep all your cores (not # nodes - #
cores) busy.

Also, to create a key you can use the map method to return a Tuple2 as
Santosh showed. You can also use keyBy.

If you just want to get the number of unique users, I would do something
like this:

val userIdColIndex: Int = ...
val pageIdColIndex: Int = ...
val inputPath: String = ...
// I'm assuming the user and page ID are both strings
val pageVisitorRdd: RDD[(String, String)] = sc.textFile(inputPath).map(
record =
   val colValues = record.split('\t')
   // You might want error handling in here - I'll assume all records are
valid
   val userId = colValues(userIdColIndex)
   val pageId = colValues(pageIdColIndex)
   (pageId, userId) // Here's your key-value pair
}

So now that you have your pageId - userId mappings, what to do with them?
Maybe the most obvious would be:

val uniquePageVisits: RDD[(String, Int)] =
pageVistorRdd.groupByKey().mapValues(_.toSet.size)

But groupByKey will be a bad choice if you have many visits per page
(you'll end up with a large collection in each record). It might be better
to start with a distinct, then map to (pageId, 1) and reduceByKey(_+_) to
get the sums.

I hope that helps.


On Thu, Aug 14, 2014 at 2:14 PM, bdev buntu...@gmail.com wrote:

 Thanks, will give that a try.

 I see the number of partitions requested is 8 (through HashPartitioner(8)).
 If I have a 40 node cluster, whats the recommended number of partitions?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Ways-to-partition-the-RDD-tp12083p12128.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Ways to partition the RDD

2014-08-14 Thread Daniel Siegmann
There may be cases where you want to adjust the number of partitions or
explicitly call RDD.repartition or RDD.coalesce. However, I would start
with the defaults and then adjust if necessary to improve performance (for
example, if cores are idling because there aren't enough tasks you may want
more partitions).

Looks like PairRDDFunctions has a countByKey (though you'd still need to
distinct first). You might also look at combineByKey and foldByKey as
alternatives to reduceByKey or groupByKey.


On Thu, Aug 14, 2014 at 4:14 PM, bdev buntu...@gmail.com wrote:

 Thanks Daniel for the detailed information. Since the RDD is already
 partitioned, there is no need to worry about repartitioning.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Ways-to-partition-the-RDD-tp12083p12136.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Number of partitions and Number of concurrent tasks

2014-08-01 Thread Daniel Siegmann
It is definitely possible to run multiple workers on a single node and have
each worker with the maximum number of cores (e.g. if you have 8 cores and
2 workers you'd have 16 cores per node). I don't know if it's possible with
the out of the box scripts though.

It's actually not really that difficult. You just run start-slave.sh
multiple times on the same node, with different IDs. Here is the usage:

# Usage: start-slave.sh worker# master-spark-URL

But we have custom scripts to do that. I'm not sure whether it is possible
using the standard start-all.sh script or that EC2 script. Probably not.

I haven't set up or managed such a cluster myself, so that's about the
extent of my knowledge. But I've deployed jobs to that cluster and enjoyed
the benefit of double the cores - we had a fair amount of I/O though, which
may be why it helped in our case. I recommend taking a look at the CPU
utilization on the nodes when running a flow before jumping through these
hoops.


On Fri, Aug 1, 2014 at 12:05 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Darin,

 I think the number of cores in your cluster is a hard limit on how many
 concurrent tasks you can execute at one time. If you want more parallelism,
 I think you just need more cores in your cluster--that is, bigger nodes, or
 more nodes.

 Daniel,

 Have you been able to get around this limit?

 Nick



 On Fri, Aug 1, 2014 at 11:49 AM, Daniel Siegmann daniel.siegm...@velos.io
  wrote:

 Sorry, but I haven't used Spark on EC2 and I'm not sure what the problem
 could be. Hopefully someone else will be able to help. The only thing I
 could suggest is to try setting both the worker instances and the number of
 cores (assuming spark-ec2 has such a parameter).


 On Thu, Jul 31, 2014 at 3:03 PM, Darin McBeath ddmcbe...@yahoo.com
 wrote:

 Ok, I set the number of spark worker instances to 2 (below is my startup
 command).  But, this essentially had the effect of increasing my number of
 workers from 3 to 6 (which was good) but it also reduced my number of cores
 per worker from 8 to 4 (which was not so good).  In the end, I would still
 only be able to concurrently process 24 partitions in parallel.  I'm
 starting a stand-alone cluster using the spark provided ec2 scripts .  I
 tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py
 but this had no effect. So, it's not clear if I could even set the
 SPARK_WORKER_CORES with the ec2 scripts.  Anyway, not sure if there is
 anything else I can try but at least wanted to document what I did try and
 the net effect.  I'm open to any suggestions/advice.

  ./spark-ec2 -k *key* -i key.pem --hadoop-major-version=2 launch -s 3
 -t m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2
 *my-cluster*


   --
  *From:* Daniel Siegmann daniel.siegm...@velos.io
 *To:* Darin McBeath ddmcbe...@yahoo.com
 *Cc:* Daniel Siegmann daniel.siegm...@velos.io; user@spark.apache.org
 user@spark.apache.org
 *Sent:* Thursday, July 31, 2014 10:04 AM

 *Subject:* Re: Number of partitions and Number of concurrent tasks

 I haven't configured this myself. I'd start with setting
 SPARK_WORKER_CORES to a higher value, since that's a bit simpler than
 adding more workers. This defaults to all available cores according to
 the documentation, so I'm not sure if you can actually set it higher. If
 not, you can get around this by adding more worker instances; I believe
 simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient.

 I don't think you *have* to set the cores if you have more workers - it
 will default to 8 cores per worker (in your case). But maybe 16 cores per
 node will be too many. You'll have to test. Keep in mind that more workers
 means more memory and such too, so you may need to tweak some other
 settings downward in this case.

 On a side note: I've read some people found performance was better when
 they had more workers with less memory each, instead of a single worker
 with tons of memory, because it cut down on garbage collection time. But I
 can't speak to that myself.

 In any case, if you increase the number of cores available in your
 cluster (whether per worker, or adding more workers per node, or of course
 adding more nodes) you should see more tasks running concurrently. Whether
 this will actually be *faster* probably depends mainly on whether the
 CPUs in your nodes were really being fully utilized with the current number
 of cores.


 On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath ddmcbe...@yahoo.com
 wrote:

 Thanks.

  So to make sure I understand.  Since I'm using a 'stand-alone'
 cluster, I would set SPARK_WORKER_INSTANCES to something like 2
 (instead of the default value of 1).  Is that correct?  But, it also sounds
 like I need to explicitly set a value for SPARKER_WORKER_CORES (based on
 what the documentation states).  What would I want that value to be based
 on my configuration below?  Or, would I leave that alone

Re: Number of partitions and Number of concurrent tasks

2014-07-30 Thread Daniel Siegmann
This is correct behavior. Each core can execute exactly one task at a
time, with each task corresponding to a partition. If your cluster only has
24 cores, you can only run at most 24 tasks at once.

You could run multiple workers per node to get more executors. That would
give you more cores in the cluster. But however many cores you have, each
core will run only one task at a time.


On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote:

 I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.

 I have an RDDString which I've repartitioned so it has 100 partitions
 (hoping to increase the parallelism).

 When I do a transformation (such as filter) on this RDD, I can't  seem to
 get more than 24 tasks (my total number of cores across the 3 nodes) going
 at one point in time.  By tasks, I mean the number of tasks that appear
 under the Application UI.  I tried explicitly setting the
 spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently
 running) and verified this in the Application UI for the running
 application but this had no effect.  Perhaps, this is ignored for a
 'filter' and the default is the total number of cores available.

 I'm fairly new with Spark so maybe I'm just missing or misunderstanding
 something fundamental.  Any help would be appreciated.

 Thanks.

 Darin.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: mapToPair vs flatMapToPair vs flatMap function usage.

2014-07-25 Thread Daniel Siegmann
The map and flatMap methods have a similar purpose, but map is 1 to 1,
while flatMap is 1 to 0-N (outputting 0 is similar to a filter, except of
course it could be outputting a different type).


On Thu, Jul 24, 2014 at 6:41 PM, abhiguruvayya sharath.abhis...@gmail.com
wrote:

 Can any one help me understand the key difference between mapToPair vs
 flatMapToPair vs flatMap functions and also when to apply these functions
 in
 particular.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Daniel Siegmann
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your example is actually grouping on just the name. Try this:

sc.parallelize(ps).map(x= (x,1)).groupByKey().collect
res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
(P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
(P(charly),ArrayBuffer(1)))


On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
 and derivates like 'distinct'.

 groupByKey() seems to work

 sc.parallelize(ps).map(x= (x.name,1)).groupByKey().collect
 res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
 (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



 On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

 A minimal example:

 case class P(name:String)
 val ps = Array(P(alice), P(bob), P(charly), P(bob))
 sc.parallelize(ps).map(x= (x,1)).reduceByKey((x,y) = x+y).collect
 [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
 (P(bob),1), (P(abe),1), (P(charly),1))

 In contrast to the expected behavior, that should be equivalent to:
 sc.parallelize(ps).map(x= (x.name,1)).reduceByKey((x,y) = x+y).collect
 Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

 Any ideas why this doesn't work?

 -kr, Gerard.





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Can we get a spark context inside a mapper

2014-07-14 Thread Daniel Siegmann
Rahul, I'm not sure what you mean by your question being very
unprofessional. You can feel free to answer such questions here. You may
or may not receive an answer, and you shouldn't necessarily expect to have
your question answered within five hours.

I've never tried to do anything like your case. I imagine the easiest thing
would be to read and process each file individually, since you are
intending to produce a separate result for each. You could also look at
RDD.wholeTextFiles - maybe that will be of some use if your files are small
- but I don't know of any corresponding save method which would generate
files with different names from within a single RDD.


On Mon, Jul 14, 2014 at 2:30 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com
 wrote:

 I understand that the question is very unprofessional, but I am a newbie.
 If you could share some link where I can ask such questions, if not here.

 But please answer.


 On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani 
 rahulbhojwani2...@gmail.com wrote:

 Hey, My question is for this situation:
 Suppose we have 10 files each containing list of features in each row.

 Task is that for each file cluster the features in that file and write
 the corresponding cluster along with it in a new file.  So we have to
 generate 10 more files by applying clustering in each file
 individually.

 So can I do it this way, that get rdd of list of files and apply map.
 Inside the mapper function which will be handling each file, get another
 spark context and use Mllib kmeans to get the clustered output file.

 Please suggest the appropriate method to tackle this problem.

 Thanks,
 Rahul Kumar Bhojwani
 3rd year, B.Tech
 Computer Science Engineering
 National Institute Of Technology, Karnataka
 9945197359




 --
 Rahul K Bhojwani
 3rd Year B.Tech
 Computer Science and Engineering
 National Institute of Technology, Karnataka




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Memory compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
I don't have a solution for you (sorry), but do note that
rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
set shuffle=true then it should repartition and redistribute the data. But
it uses the hash partitioner according to the ScalaDoc - I don't know of
any way to supply a custom partitioner.


On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote:

 I'm trying to run a job that includes an invocation of a memory 
 compute-intensive multithreaded C++ program, and so I'd like to run one
 task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
 one task per core, and so runs out of memory on the node. Is there any way
 to give the scheduler a hint that the task uses lots of memory and cores so
 it spreads it out more evenly?

 Thanks,

 Ravi Pandya
 Microsoft Research




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Memory compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
Depending on how your C++ program is designed, maybe you can feed the data
from multiple partitions into the same process? Getting the results back
might be tricky. But that may be the only way to guarantee you're only
using one invocation per node.


On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 I think coalesce with shuffle=true will force it to have one task per
 node. Without that, it might be that due to data locality it decides to
 launch multiple ones on the same node even though the total # of tasks is
 equal to the # of nodes.

 If this is the *only* thing you run on the cluster, you could also
 configure the Workers to only report one core by manually launching the
 spark.deploy.worker.Worker process with that flag (see
 http://spark.apache.org/docs/latest/spark-standalone.html).

 Matei

 On Jul 14, 2014, at 1:59 PM, Daniel Siegmann daniel.siegm...@velos.io
 wrote:

 I don't have a solution for you (sorry), but do note that
 rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
 set shuffle=true then it should repartition and redistribute the data.
 But it uses the hash partitioner according to the ScalaDoc - I don't know
 of any way to supply a custom partitioner.


 On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya r...@iecommerce.com wrote:

 I'm trying to run a job that includes an invocation of a memory 
 compute-intensive multithreaded C++ program, and so I'd like to run one
 task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
 one task per core, and so runs out of memory on the node. Is there any way
 to give the scheduler a hint that the task uses lots of memory and cores so
 it spreads it out more evenly?

 Thanks,

 Ravi Pandya
 Microsoft Research




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: All of the tasks have been completed but the Stage is still shown as Active?

2014-07-10 Thread Daniel Siegmann
One thing to keep in mind is that the progress bar doesn't take into
account tasks which are rerun. If you see 4/4 but the stage is still
active, click the stage name and look at the task list. That will show you
if any are actually running. When rerun tasks complete, it can result in
the number of successful tasks being greater than the number of total
tasks; e.g. the progress bar might display 5/4.

Another bug is that a stage might complete and be moved to the completed
list, but if tasks are then rerun it will appear in both the completed and
active stages list. If it completes again, you will see that stage *twice*
in the completed stages list.

Of course, you should only be seeing this behavior if things are going
wrong; a node failing, for example.


On Thu, Jul 10, 2014 at 4:21 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I'm running an App for hours in a standalone cluster. From the data
 injector and Streaming tab of web ui, it's running well.

 However, I see quite a lot of Active stages in web ui even some of them
 have all of their tasks completed.

 I attach a screenshot for your reference.

 Do you ever see this kind of behavior?




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Comparative study

2014-07-08 Thread Daniel Siegmann
In addition to Scalding and Scrunch, there is Scoobi. Unlike the others, it
is only Scala (it doesn't wrap a Java framework). All three have fairly
similar APIs and aren't too different from Spark. For example, instead of
RDD you have DList (distributed list) or PCollection (parallel collection)
- or in Scalding's case, Pipe, because Cascading had to get cute with its
names.


On Mon, Jul 7, 2014 at 8:12 PM, Sean Owen so...@cloudera.com wrote:

 On Tue, Jul 8, 2014 at 1:05 AM, Nabeel Memon nm3...@gmail.com wrote:

 For Scala API on map/reduce (hadoop engine) there's a library called
 Scalding. It's built on top of Cascading. If you have a huge dataset or
 if you consider using map/reduce engine for your job, for any reason, you
 can try Scalding.


 PS Crunch also has a Scala API called Scrunch. And Crunch can run its jobs
 on Spark too, not just M/R.





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Comparative study

2014-07-08 Thread Daniel Siegmann
I don't have those numbers off-hand. Though the shuffle spill to disk was
coming to several gigabytes per node, if I recall correctly.

The MapReduce pipeline takes about 2-3 hours I think for the full 60 day
data set. Spark chugs along fine for awhile and then hangs. We restructured
the flow a few times, but in the last iteration it was hanging when trying
to save the feature profiles with just a couple of tasks remaining (those
tasks ran for 10+ hours before we killed it). In a previous iteration we
did get it to run through. We broke our flow into two parts though - first
saving the raw profiles out to disk, then reading them back in for scoring.

That was on just 10 days of data, by the way - one sixth of what the
MapReduce flow normally runs through on the same cluster.

I haven't tracked down the cause. YMMV


On Mon, Jul 7, 2014 at 8:14 PM, Soumya Simanta soumya.sima...@gmail.com
wrote:



 Daniel,

 Do you mind sharing the size of your cluster and the production data
 volumes ?

 Thanks
 Soumya

 On Jul 7, 2014, at 3:39 PM, Daniel Siegmann daniel.siegm...@velos.io
 wrote:

 From a development perspective, I vastly prefer Spark to MapReduce. The
 MapReduce API is very constrained; Spark's API feels much more natural to
 me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

 Unfortunately, the picture isn't so rosy when it gets to production. In my
 experience, Spark simply doesn't scale to the volumes that MapReduce will
 handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be
 better, but I haven't had the opportunity to try them. I find jobs tend to
 just hang forever for no apparent reason on large data sets (but smaller
 than what I push through MapReduce).

 I am hopeful the situation will improve - Spark is developing quickly -
 but if you have large amounts of data you should proceed with caution.

 Keep in mind there are some frameworks for Hadoop which can hide the ugly
 MapReduce with something very similar in form to Spark's API; e.g. Apache
 Crunch. So you might consider those as well.

 (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




 --
 Daniel Siegmann, Software Developer
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Comparative study

2014-07-08 Thread Daniel Siegmann
I believe our full 60 days of data contains over ten million unique
entities. Across 10 days I'm not sure, but it should be in the millions. I
haven't verified that myself though. So that's the scale of the RDD we're
writing to disk (each entry is entityId - profile).

I think it's hard to know how Spark will hold up without trying yourself,
on your own flow. Also, keep in mind this was with a Spark Standalone
cluster - perhaps Mesos or YARN would hold up better.


On Tue, Jul 8, 2014 at 1:04 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 I'll respond for Dan.

 Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

 I'm not sure what the size of the final output data was but I think it was
 on the order of 20 GBs for the given 10 GB of input data. Also, I can say
 that when we were experimenting with persist(DISK_ONLY), the size of all
 RDDs on disk was around 200 GB, which gives a sense of overall transient
 memory usage with no persistence.

 In terms of our test cluster, we had 15 nodes. Each node had 24 cores and
 2 workers each. Each executor got 14 GB of memory.

 -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce. The
 MapReduce API is very constrained; Spark's API feels much more natural to
 me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production. In
 my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly -
 but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




 --
  Daniel Siegmann, Software Developer
 Velos
  Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Comparative study

2014-07-08 Thread Daniel Siegmann
I think we're missing the point a bit. Everything was actually flowing
through smoothly and in a reasonable time. Until it reached the last two
tasks (out of over a thousand in the final stage alone), at which point it
just fell into a coma. Not so much as a cranky message in the logs.

I don't know *why* that happened. Maybe it isn't the overall amount of
data, but something I'm doing wrong with my flow. In any case, improvements
to diagnostic info would probably be helpful.

I look forward to the next release. :-)


On Tue, Jul 8, 2014 at 3:47 PM, Reynold Xin r...@databricks.com wrote:

 Not sure exactly what is happening but perhaps there are ways to
 restructure your program for it to work better. Spark is definitely able to
 handle much, much larger workloads.

 I've personally run a workload that shuffled 300 TB of data. I've also ran
 something that shuffled 5TB/node and stuffed my disks fairly full that the
 file system is close to breaking.

 We can definitely do a better job in Spark to make it output more
 meaningful diagnosis and more robust with partitions of data that don't fit
 in memory though. A lot of the work in the next few releases will be on
 that.



 On Tue, Jul 8, 2014 at 10:04 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 I'll respond for Dan.

 Our test dataset was a total of 10 GB of input data (full production
 dataset for this particular dataflow would be 60 GB roughly).

 I'm not sure what the size of the final output data was but I think it
 was on the order of 20 GBs for the given 10 GB of input data. Also, I can
 say that when we were experimenting with persist(DISK_ONLY), the size of
 all RDDs on disk was around 200 GB, which gives a sense of overall
 transient memory usage with no persistence.

 In terms of our test cluster, we had 15 nodes. Each node had 24 cores and
 2 workers each. Each executor got 14 GB of memory.

 -Suren



 On Tue, Jul 8, 2014 at 12:06 PM, Kevin Markey kevin.mar...@oracle.com
 wrote:

  When you say large data sets, how large?
 Thanks


 On 07/07/2014 01:39 PM, Daniel Siegmann wrote:

  From a development perspective, I vastly prefer Spark to MapReduce.
 The MapReduce API is very constrained; Spark's API feels much more natural
 to me. Testing and local development is also very easy - creating a local
 Spark context is trivial and it reads local files. For your unit tests you
 can just have them create a local context and execute your flow with some
 test data. Even better, you can do ad-hoc work in the Spark shell and if
 you want that in your production code it will look exactly the same.

  Unfortunately, the picture isn't so rosy when it gets to production.
 In my experience, Spark simply doesn't scale to the volumes that MapReduce
 will handle. Not with a Standalone cluster anyway - maybe Mesos or YARN
 would be better, but I haven't had the opportunity to try them. I find jobs
 tend to just hang forever for no apparent reason on large data sets (but
 smaller than what I push through MapReduce).

  I am hopeful the situation will improve - Spark is developing quickly
 - but if you have large amounts of data you should proceed with caution.

  Keep in mind there are some frameworks for Hadoop which can hide the
 ugly MapReduce with something very similar in form to Spark's API; e.g.
 Apache Crunch. So you might consider those as well.

  (Note: the above is with Spark 1.0.0.)



 On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com
 wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR
 implementation to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




 --
  Daniel Siegmann, Software Developer
 Velos
  Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
 E: daniel.siegm...@velos.io W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW

Re: Control number of tasks per stage

2014-07-07 Thread Daniel Siegmann
The default number of tasks when reading files is based on how the files
are split among the nodes. Beyond that, the default number of tasks after a
shuffle is based on the property spark.default.parallelism. (see
http://spark.apache.org/docs/latest/configuration.html).

You can use RDD.repartition to increase or decrease the number of tasks (or
RDD.coalesce, but you must set shuffle to true if you want to increase the
partitions). Other RDD methods which cause a shuffle usually have a
parameter to set the number of tasks.



On Mon, Jul 7, 2014 at 11:25 AM, Konstantin Kudryavtsev 
kudryavtsev.konstan...@gmail.com wrote:

 Hi all,

 is it any way to control the number tasks per stage?

 currently I see situation when only 2 tasks are created per stage and each
 of them is very slow, at the same time cluster has a huge number of unused
 nodes


 Thank you,
 Konstantin Kudryavtsev




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Comparative study

2014-07-07 Thread Daniel Siegmann
From a development perspective, I vastly prefer Spark to MapReduce. The
MapReduce API is very constrained; Spark's API feels much more natural to
me. Testing and local development is also very easy - creating a local
Spark context is trivial and it reads local files. For your unit tests you
can just have them create a local context and execute your flow with some
test data. Even better, you can do ad-hoc work in the Spark shell and if
you want that in your production code it will look exactly the same.

Unfortunately, the picture isn't so rosy when it gets to production. In my
experience, Spark simply doesn't scale to the volumes that MapReduce will
handle. Not with a Standalone cluster anyway - maybe Mesos or YARN would be
better, but I haven't had the opportunity to try them. I find jobs tend to
just hang forever for no apparent reason on large data sets (but smaller
than what I push through MapReduce).

I am hopeful the situation will improve - Spark is developing quickly - but
if you have large amounts of data you should proceed with caution.

Keep in mind there are some frameworks for Hadoop which can hide the ugly
MapReduce with something very similar in form to Spark's API; e.g. Apache
Crunch. So you might consider those as well.

(Note: the above is with Spark 1.0.0.)



On Mon, Jul 7, 2014 at 11:07 AM, santosh.viswanat...@accenture.com wrote:

  Hello Experts,



 I am doing some comparative study on the below:



 Spark vs Impala

 Spark vs MapREduce . Is it worth migrating from existing MR implementation
 to Spark?





 Please share your thoughts and expertise.





 Thanks,
 Santosh

 --

 This message is for the designated recipient only and may contain
 privileged, proprietary, or otherwise confidential information. If you have
 received it in error, please notify the sender immediately and delete the
 original. Any other use of the e-mail by you is prohibited. Where allowed
 by local law, electronic communications with Accenture and its affiliates,
 including e-mail and instant messaging (including content), may be scanned
 by our systems for the purposes of information security and assessment of
 internal compliance with Accenture policy.

 __

 www.accenture.com




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Map with filter on JavaRdd

2014-06-27 Thread Daniel Siegmann
If for some reason it would be easier to do your mapping and filtering in a
single function, you can also use RDD.flatMap (returning an empty sequence
is equivalent to a filter). But unless you have good reason you should have
a separate map and filter transform, as Mayur said.


On Fri, Jun 27, 2014 at 7:43 AM, ajay garg ajay.g...@mobileum.com wrote:

 Thanks Mayur for clarification..



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401p8410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: partitions, coalesce() and parallelism

2014-06-25 Thread Daniel Siegmann
:
 collect at console:48, took 0.821161249 s
 res7: Array[Int] = Array(1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008,
 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020,
 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039, 1040, 1051, 1052,
 1053, 1054, 1055, 1056, 1057, 1058, 1059, 1060, 1081, 1082, 1083, 1084,
 1085, 1086, 1087, 1088, 1089, 1090, 1101, 1102, 1103, 1104, 1105, 1106,
 1107, 1108, 1109, 1110, 1121, 1122, 1123, 1124, 1125, 1126, 1127, 1128,
 1129, 1130, 1141, 1142, 1143, 1144, 1145, 1146, 1147, 1148, 1149, 1150,
 1161, 1162, 1163, 1164, 1165, 1166, 1167, 1168, 1169, 1170, 1181, 1182,
 1183, 1184, 1185, 1186, 1187, 1188, 1189, 1190, 1201, 1202, 1203, 1204,
 1205, 1206, 1207, 1208, 1209, 1210, 1221, 1222, 1223, 1224, 1225, 1226,
 1227, 1228, 1229, 1230, 1241, 1242, 1243, 1244, 1245, 1246, 1247, 1248,
 1249...




 On Tue, Jun 24, 2014 at 5:39 PM, Alex Boisvert alex.boisv...@gmail.com
 wrote:

 Yes.

 scala rawLogs.partitions.size
 res1: Int = 2171



 On Tue, Jun 24, 2014 at 4:00 PM, Mayur Rustagi mayur.rust...@gmail.com
 wrote:

 To be clear number of map tasks are determined by number of partitions
 inside the rdd hence the suggestion by Nicholas.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Jun 25, 2014 at 4:17 AM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 So do you get 2171 as the output for that command? That command tells
 you how many partitions your RDD has, so it’s good to first confirm that
 rdd1 has as many partitions as you think it has.
 ​


 On Tue, Jun 24, 2014 at 4:22 PM, Alex Boisvert 
 alex.boisv...@gmail.com wrote:

 It's actually a set of 2171 S3 files, with an average size of about
 18MB.


 On Tue, Jun 24, 2014 at 1:13 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 What do you get for rdd1._jrdd.splits().size()? You might think
 you’re getting  100 partitions, but it may not be happening.
 ​


 On Tue, Jun 24, 2014 at 3:50 PM, Alex Boisvert 
 alex.boisv...@gmail.com wrote:

 With the following pseudo-code,

 val rdd1 = sc.sequenceFile(...) // has  100 partitions
 val rdd2 = rdd1.coalesce(100)
 val rdd3 = rdd2 map { ... }
 val rdd4 = rdd3.coalesce(2)
 val rdd5 = rdd4.saveAsTextFile(...) // want only two output files

 I would expect the parallelism of the map() operation to be 100
 concurrent tasks, and the parallelism of the save() operation to be 2.

 However, it appears the parallelism of the entire chain is 2 -- I
 only see two tasks created for the save() operation and those tasks 
 appear
 to execute the map() operation as well.

 Assuming what I'm seeing is as-specified (meaning, how things are
 meant to be), what's the recommended way to force a parallelism of 100 
 on
 the map() operation?

 thanks!












-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: guidance on simple unit testing with Spark

2014-06-16 Thread Daniel Siegmann
If you don't want to refactor your code, you can put your input into a test
file. After the test runs, read the data from the output file you specified
(probably want this to be a temp file and delete on exit). Of course, that
is not really a unit test - Metei's suggestion is preferable (this is how
we test). However, if you have a long and complex flow, you might unit test
different parts, and then have an integration test which reads from the
files and tests the whole flow together (I do this as well).




On Fri, Jun 13, 2014 at 10:04 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 You need to factor your program so that it’s not just a main(). This is
 not a Spark-specific issue, it’s about how you’d unit test any program in
 general. In this case, your main() creates a SparkContext, so you can’t
 pass one from outside, and your code has to read data from a file and write
 it to a file. It would be better to move your code for transforming data
 into a new function:

 def processData(lines: RDD[String]): RDD[String] = {
   // build and return your “res” variable
 }

 Then you can unit-test this directly on data you create in your program:

 val myLines = sc.parallelize(Seq(“line 1”, “line 2”))
 val result = GetInfo.processData(myLines).collect()
 assert(result.toSet === Set(“res 1”, “res 2”))

 Matei

 On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote:

  Hi,
 
  I have looked through some of the  test examples and also the brief
  documentation on unit testing at
  http://spark.apache.org/docs/latest/programming-guide.html#unit-testing,
 but
  still dont have a good understanding of writing unit tests using the
 Spark
  framework. Previously, I have written unit tests using specs2 framework
 and
  have got them to work in Scalding.  I tried to use the specs2 framework
 with
  Spark, but could not find any simple examples I could follow. I am open
 to
  specs2 or Funsuite, whichever works best with Spark. I would like some
  additional guidance, or some simple sample code using specs2 or
 Funsuite. My
  code is provided below.
 
 
  I have the following code in src/main/scala/GetInfo.scala. It reads a
 Json
  file and extracts some data. It takes the input file (args(0)) and output
  file (args(1)) as arguments.
 
  object GetInfo{
 
def main(args: Array[String]) {
  val inp_file = args(0)
  val conf = new SparkConf().setAppName(GetInfo)
  val sc = new SparkContext(conf)
  val res = sc.textFile(log_file)
.map(line = { parse(line) })
.map(json =
   {
  implicit lazy val formats =
  org.json4s.DefaultFormats
  val aid = (json \ d \ TypeID).extract[Int]
  val ts = (json \ d \ TimeStamp).extract[Long]
  val gid = (json \ d \ ID).extract[String]
  (aid, ts, gid)
   }
 )
.groupBy(tup = tup._3)
.sortByKey(true)
.map(g = (g._1, g._2.map(_._2).max))
  res.map(tuple= %s, %d.format(tuple._1,
  tuple._2)).saveAsTextFile(args(1))
  }
 
 
  I would like to test the above code. My unit test is in src/test/scala.
 The
  code I have so far for the unit test appears below:
 
  import org.apache.spark._
  import org.specs2.mutable._
 
  class GetInfoTest extends Specification with java.io.Serializable{
 
  val data = List (
   (d: {TypeID = 10, Timestamp: 1234, ID: ID1}),
   (d: {TypeID = 11, Timestamp: 5678, ID: ID1}),
   (d: {TypeID = 10, Timestamp: 1357, ID: ID2}),
   (d: {TypeID = 11, Timestamp: 2468, ID: ID2})
 )
 
  val expected_out = List(
 (ID1,5678),
 (ID2,2468),
  )
 
 A GetInfo job should {
  //* How do I pass data define above as input and output
  which GetInfo expects as arguments? **
  val sc = new SparkContext(local, GetInfo)
 
  //*** how do I get the output ***
 
   //assuming out_buffer has the output I want to match it to
 the
  expected output
  match expected output in {
   ( out_buffer == expected_out) must beTrue
  }
  }
 
  }
 
  I would like some help with the tasks marked with  in the unit test
  code above. If specs2 is not the right way to go, I am also open to
  FunSuite. I would like to know how to pass the input while calling my
  program from the unit test and get the output.
 
  Thanks for your help.
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing file

2014-06-12 Thread Daniel Siegmann
The old behavior (A) was dangerous, so it's good that (B) is now the
default. But in some cases I really do want to replace the old data, as per
(C). For example, I may rerun a previous computation (perhaps the input
data was corrupt and I'm rerunning with good input).

Currently I have to write separate code to remove the files before calling
Spark. It would be very convenient if Spark could do this for me. Has
anyone created a JIRA issue to support (C)?


On Mon, Jun 9, 2014 at 3:02 AM, Aaron Davidson ilike...@gmail.com wrote:

 It is not a very good idea to save the results in the exact same place as
 the data. Any failures during the job could lead to corrupted data, because
 recomputing the lost partitions would involve reading the original
 (now-nonexistent) data.

 As such, the only safe way to do this would be to do as you said, and
 only delete the input data once the entire output has been successfully
 created.


 On Sun, Jun 8, 2014 at 10:32 PM, innowireless TaeYun Kim 
 taeyun@innowireless.co.kr wrote:

 Without (C), what is the best practice to implement the following
 scenario?

 1. rdd = sc.textFile(FileA)
 2. rdd = rdd.map(...)  // actually modifying the rdd
 3. rdd.saveAsTextFile(FileA)

 Since the rdd transformation is 'lazy', rdd will not materialize until
 saveAsTextFile(), so FileA must still exist, but it must be deleted before
 saveAsTextFile().

 What I can think is:

 3. rdd.saveAsTextFile(TempFile)
 4. delete FileA
 5. rename TempFile to FileA

 This is not very convenient...

 Thanks.

 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]
 Sent: Tuesday, June 03, 2014 11:40 AM
 To: user@spark.apache.org
 Subject: Re: How can I make Spark 1.0 saveAsTextFile to overwrite existing
 file

 (A) Semantics in Spark 0.9 and earlier: Spark will ignore Hadoo's output
 format check and overwrite files in the destination directory.
 But it won't clobber the directory entirely. I.e. if the directory already
 had part1 part2 part3 part4 and you write a new job outputing only
 two files (part1, part2) then it would leave the other two files
 intact,
 confusingly.

 (B) Semantics in Spark 1.0 and earlier: Runs Hadoop OutputFormat check
 which
 means the directory must not exist already or an excpetion is thrown.

 (C) Semantics proposed by Nicholas Chammas in this thread (AFAIK):
 Spark will delete/clobber an existing destination directory if it exists,
 then fully over-write it with new data.

 I'm fine to add a flag that allows (B) for backwards-compatibility
 reasons,
 but my point was I'd prefer not to have (C) even though I see some cases
 where it would be useful.

 - Patrick

 On Mon, Jun 2, 2014 at 4:25 PM, Sean Owen so...@cloudera.com wrote:
  Is there a third way? Unless I miss something. Hadoop's OutputFormat
  wants the target dir to not exist no matter what, so it's just a
  question of whether Spark deletes it for you or errors.
 
  On Tue, Jun 3, 2014 at 12:22 AM, Patrick Wendell pwend...@gmail.com
 wrote:
  We can just add back a flag to make it backwards compatible - it was
  just missed during the original PR.
 
  Adding a *third* set of clobber semantics, I'm slightly -1 on that
  for the following reasons:
 
  1. It's scary to have Spark recursively deleting user files, could
  easily lead to users deleting data by mistake if they don't
  understand the exact semantics.
  2. It would introduce a third set of semantics here for saveAsXX...
  3. It's trivial for users to implement this with two lines of code
  (if output dir exists, delete it) before calling saveAsHadoopFile.
 
  - Patrick
 





-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


  1   2   >