Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread Daniel Siegmann
Those settings apply when a shuffle happens. But they don't affect the way
the data will be partitioned when it is initially read, for example
spark.read.parquet("path/to/input"). So for HDFS / S3 I think it depends on
how the data is split into chunks, but if there are lots of small chunks
Spark will automatically merge them into small partitions. There are going
to be various settings depending on what you're reading from.

val df = spark.read.parquet("path/to/input") // partitioning will depend on
the data
val df2 = df.groupBy("thing").count() // a shuffle happened, so shuffle
partitioning configuration applies


Tip: gzip files can't be split, so if you read a gzip file everything will
be in one partition. That's a good reason to avoid large gzip files. :-)

If you don't have a shuffle but you want to change how many partitions
there are, you will need to coalesce or repartition.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Oct 26, 2017 at 11:31 AM, lucas.g...@gmail.com  wrote:

> Thanks Daniel!
>
> I've been wondering that for ages!
>
> IE where my JDBC sourced datasets are coming up with 200 partitions on
> write to S3.
>
> What do you mean for (except for the initial read)?
>
> Can you explain that a bit further?
>
> Gary Lucas
>
> On 26 October 2017 at 11:28, Daniel Siegmann  io> wrote:
>
>> When working with datasets, Spark uses spark.sql.shuffle.partitions. It
>> defaults to 200. Between that and the default parallelism you can control
>> the number of partitions (except for the initial read).
>>
>> More info here: http://spark.apache.org/docs/l
>> atest/sql-programming-guide.html#other-configuration-options
>>
>> I have no idea why it defaults to a fixed 200 (while default parallelism
>> defaults to a number scaled to your number of cores), or why there are two
>> separate configuration properties.
>>
>>
>> --
>> Daniel Siegmann
>> Senior Software Engineer
>> *SecurityScorecard Inc.*
>> 214 W 29th Street, 5th Floor
>> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001&entry=gmail&source=g>
>> New York, NY 10001
>> <https://maps.google.com/?q=214+W+29th+Street,+5th+FloorNew+York,+NY+10001&entry=gmail&source=g>
>>
>>
>> On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma 
>> wrote:
>>
>>> I guess the issue is spark.default.parallelism is ignored when you are
>>> working with Data frames.It is supposed to work with only raw RDDs.
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <
>>> noo...@noorul.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have the following spark configuration
>>>>
>>>> spark.app.name=Test
>>>> spark.cassandra.connection.host=127.0.0.1
>>>> spark.cassandra.connection.keep_alive_ms=5000
>>>> spark.cassandra.connection.port=1
>>>> spark.cassandra.connection.timeout_ms=3
>>>> spark.cleaner.ttl=3600
>>>> spark.default.parallelism=4
>>>> spark.master=local[2]
>>>> spark.ui.enabled=false
>>>> spark.ui.showConsoleProgress=false
>>>>
>>>> Because I am setting spark.default.parallelism to 4, I was expecting
>>>> only 4 spark partitions. But it looks like it is not the case
>>>>
>>>> When I do the following
>>>>
>>>> df.foreachPartition { partition =>
>>>>   val groupedPartition = partition.toList.grouped(3).toList
>>>>   println("Grouped partition " + groupedPartition)
>>>> }
>>>>
>>>> There are too many print statements with empty list at the top. Only
>>>> the relevant partitions are at the bottom. Is there a way to control
>>>> number of partitions?
>>>>
>>>> Regards,
>>>> Noorul
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
>>
>


Re: Controlling number of spark partitions in dataframes

2017-10-26 Thread Daniel Siegmann
When working with datasets, Spark uses spark.sql.shuffle.partitions. It
defaults to 200. Between that and the default parallelism you can control
the number of partitions (except for the initial read).

More info here:
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

I have no idea why it defaults to a fixed 200 (while default parallelism
defaults to a number scaled to your number of cores), or why there are two
separate configuration properties.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma 
wrote:

> I guess the issue is spark.default.parallelism is ignored when you are
> working with Data frames.It is supposed to work with only raw RDDs.
>
> Thanks
> Deepak
>
> On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <
> noo...@noorul.com> wrote:
>
>> Hi all,
>>
>> I have the following spark configuration
>>
>> spark.app.name=Test
>> spark.cassandra.connection.host=127.0.0.1
>> spark.cassandra.connection.keep_alive_ms=5000
>> spark.cassandra.connection.port=1
>> spark.cassandra.connection.timeout_ms=3
>> spark.cleaner.ttl=3600
>> spark.default.parallelism=4
>> spark.master=local[2]
>> spark.ui.enabled=false
>> spark.ui.showConsoleProgress=false
>>
>> Because I am setting spark.default.parallelism to 4, I was expecting
>> only 4 spark partitions. But it looks like it is not the case
>>
>> When I do the following
>>
>> df.foreachPartition { partition =>
>>   val groupedPartition = partition.toList.grouped(3).toList
>>   println("Grouped partition " + groupedPartition)
>> }
>>
>> There are too many print statements with empty list at the top. Only
>> the relevant partitions are at the bottom. Is there a way to control
>> number of partitions?
>>
>> Regards,
>> Noorul
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
On Thu, Sep 28, 2017 at 7:23 AM, Gourav Sengupta 
wrote:

>
> I will be very surprised if someone tells me that a 1 GB CSV text file is
> automatically split and read by multiple executors in SPARK. It does not
> matter whether it stays in HDFS, S3 or any other system.
>

I can't speak to *any* system, but I can confirm for HDFS, S3, and local
filesystems a 1 GB CSV file would be split.


>
> Now if someone tells me that in case I have a smaller CSV file of 100MB
> size and that will be split while being read, that will also be surprising.
>

I'm not sure what the default is. It may be 128 MB, in which case that file
would not be split.

Keep in mind gzipped files cannot be split. If you have very large text
files and you want to compress them, and they will be > a few hundred MB
compressed, you should probably use bzip2 instead (which can be split).


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
> Can you kindly explain how Spark uses parallelism for bigger (say 1GB)
> text file? Does it use InputFormat do create multiple splits and creates 1
> partition per split? Also, in case of S3 or NFS, how does the input split
> work? I understand for HDFS files are already pre-split so Spark can use
> dfs.blocksize to determine partitions. But how does it work other than HDFS?
>

S3 is similar to HDFS I think. I'm not sure off-hand how exactly it decides
to split for the local filesystem. But it does. Maybe someone else will be
able to explain the details.


Re: More instances = slower Spark job

2017-09-28 Thread Daniel Siegmann
> no matter what you do and how many nodes you start, in case you have a
> single text file, it will not use parallelism.
>

This is not true, unless the file is small or is gzipped (gzipped files
cannot be split).


Re: Documentation on "Automatic file coalescing for native data sources"?

2017-05-26 Thread Daniel Siegmann
Thanks for the help everyone.

It seems the automatic coalescing doesn't happen when accessing ORC data
through a Hive metastore unless you configure
spark.sql.hive.convertMetastoreOrc to be true (it is false by default). I'm
not sure if this is documented somewhere, or if there's any reason not to
enable it, but I haven't had any problem with it.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Sat, May 20, 2017 at 9:14 PM, Kabeer Ahmed  wrote:

> Thank you Takeshi.
>
> As far as I see from the code pointed, the default number of bytes to pack
> in a partition is set to 128MB - size of the parquet block size.
>
> Daniel,
>
> It seems you do have a need to modify the number of bytes you want to pack
> per partition. I am curious to know the scenario. Please share if you can.
>
> Thanks,
> Kabeer.
>
> On May 20 2017, at 4:54 pm, Takeshi Yamamuro 
> wrote:
>
>> I think this document points to a logic here: https://github.com/
>> apache/spark/blob/master/sql/core/src/main/scala/org/
>> apache/spark/sql/execution/DataSourceScanExec.scala#L418
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala?recipient=dsiegmann%40securityscorecard.io#L418>
>>
>> This logic merge small files into a partition and you can control this
>> threshold via `spark.sql.files.maxPartitionBytes`.
>>
>> // maropu
>>
>>
>> On Sat, May 20, 2017 at 8:15 AM, ayan guha  wrote:
>>
>> I think like all other read operations, it is driven by input format
>> used, and I think some variation of combine file input format is used by
>> default. I think you can test it by force a particular input format which
>> gets ine file per split, then you should end up with same number of
>> partitions as your dsta files
>>
>> On Sat, 20 May 2017 at 5:12 am, Aakash Basu 
>> wrote:
>>
>> Hey all,
>>
>> A reply on this would be great!
>>
>> Thanks,
>> A.B.
>>
>> On 17-May-2017 1:43 AM, "Daniel Siegmann" 
>> wrote:
>>
>> When using spark.read on a large number of small files, these are
>> automatically coalesced into fewer partitions. The only documentation I can
>> find on this is in the Spark 2.0.0 release notes, where it simply says (
>> http://spark.apache.org/releases/spark-release-2-0-0.html
>> <http://spark.apache.org/releases/spark-release-2-0-0.html?recipient=dsiegmann%40securityscorecard.io>
>> ):
>>
>> "Automatic file coalescing for native data sources"
>>
>> Can anyone point me to documentation explaining what triggers this
>> feature, how it decides how many partitions to coalesce to, and what counts
>> as a "native data source"? I couldn't find any mention of this feature in
>> the SQL Programming Guide and Google was not helpful.
>>
>> --
>> Daniel Siegmann
>> Senior Software Engineer
>> *SecurityScorecard Inc.*
>> 214 W 29th Street, 5th Floor
>> New York, NY 10001
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Documentation on "Automatic file coalescing for native data sources"?

2017-05-16 Thread Daniel Siegmann
When using spark.read on a large number of small files, these are
automatically coalesced into fewer partitions. The only documentation I can
find on this is in the Spark 2.0.0 release notes, where it simply says (
http://spark.apache.org/releases/spark-release-2-0-0.html):

"Automatic file coalescing for native data sources"

Can anyone point me to documentation explaining what triggers this feature,
how it decides how many partitions to coalesce to, and what counts as a
"native data source"? I couldn't find any mention of this feature in the
SQL Programming Guide and Google was not helpful.

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: Deploying Spark Applications. Best Practices And Patterns

2017-04-12 Thread Daniel Siegmann
On Wed, Apr 12, 2017 at 4:11 PM, Sam Elamin  wrote:

>
> When it comes to scheduling Spark jobs, you can either submit to an
> already running cluster using things like Oozie or bash scripts, or have a
> workflow manager like Airflow or Data Pipeline to create new clusters for
> you. We went down the second route to continue with the whole immutable
> infrastructure/ "treat you're servers as cattle not pets"
>

A great overview. I just want to point out that Airflow can submit jobs to
an existing cluster if you prefer to have a shared cluster (may be ideal if
you have a bunch of smaller jobs to complete). Do keep in mind that if you
are using the EMR operator that uses the EMR add step API, these will be
submitted to YARN one at a time.


Re: [Spark] Accumulators or count()

2017-03-01 Thread Daniel Siegmann
As you noted, Accumulators do not guarantee accurate results except in
specific situations. I recommend never using them.

This article goes into some detail on the problems with accumulators:
http://imranrashid.com/posts/Spark-Accumulators/


On Wed, Mar 1, 2017 at 7:26 AM, Charles O. Bajomo <
charles.baj...@pretechconsulting.co.uk> wrote:

> Hello everyone,
>
> I wanted to know if there is any benefit to using an acculumator over just
> executing a count() on the whole RDD. There seems to be a lot of issues
> with accumulator during a stage failure and also seems to be an issue
> rebuilding them if the application restarts from a checkpoint. Anyone have
> any suggestions no this?
>
> Thanks
>


Re: Spark #cores

2017-01-18 Thread Daniel Siegmann
I am not too familiar with Spark Standalone, so unfortunately I cannot give
you any definite answer. I do want to clarify something though.

The properties spark.sql.shuffle.partitions and spark.default.parallelism
affect how your data is split up, which will determine the *total* number
of tasks, *NOT* the number of tasks being run in parallel. Except of course
you will never run more tasks in parallel than there are total, so if your
data is small you might be able to control it via these parameters - but
that wouldn't typically be how you'd use these parameters.

On YARN as you noted there is spark.executor.instances as well as
spark.executor.cores, and you'd multiple them to determine the maximum
number of tasks that would run in parallel on your cluster. But there is no
guarantee the executors would be distributed evenly across nodes.

Unfortunately I'm not familiar with how this works on Spark Standalone.
Your expectations seem reasonable to me. Sorry I can't be helpful,
hopefully someone else will be able to explain exactly how this works.


Re: Why does Spark 2.0 change number or partitions when reading a parquet file?

2016-12-22 Thread Daniel Siegmann
Spark 2.0.0 introduced "Automatic file coalescing for native data sources" (
http://spark.apache.org/releases/spark-release-2-0-0.html#performance-and-runtime).
Perhaps that is the cause?

I'm not sure if this feature is mentioned anywhere in the documentation or
if there's any way to disable it.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Dec 22, 2016 at 11:09 AM, Kristina Rogale Plazonic  wrote:

> Hi,
>
> I write a randomly generated 30,000-row dataframe to parquet. I verify
> that it has 200 partitions (both in Spark and inspecting the parquet file
> in hdfs).
>
> When I read it back in, it has 23 partitions?! Is there some optimization
> going on? (This doesn't happen in Spark 1.5)
>
> *How can I force it to read back the same partitions i.e. 200?* I'm
> trying to reproduce a problem that depends on partitioning and can't
> because the number of partitions goes way down.
>
> Thanks for any insights!
> Kristina
>
> Here is the code and output:
>
> scala> spark.version
> res13: String = 2.0.2
>
> scala> df.show(2)
> +---+---+--+--+--+--
> +++
> | id|id2|  strfeat0|  strfeat1|  strfeat2|
>  strfeat3|binfeat0|binfeat1|
> +---+---+--+--+--+--
> +++
> |  0|12345678901 <(234)%20567-8901>|fcvEmHTZte|
>  null|fnuAQdnBkJ|aU3puFMq5h|   1|   1|
> |  1|12345678902 <(234)%20567-8902>|  
> null|rtcrPaAVNX|fnuAQdnBkJ|x6NyoX662X|
>   0|   0|
> +---+---+--+--+--+--
> +++
> only showing top 2 rows
>
>
> scala> df.count
> res15: Long = 30001
>
> scala> df.rdd.partitions.size
> res16: Int = 200
>
> scala> df.write.parquet("/tmp/df")
>
>
> scala> val newdf = spark.read.parquet("/tmp/df")
> newdf: org.apache.spark.sql.DataFrame = [id: int, id2: bigint ... 6 more
> fields]
>
> scala> newdf.rdd.partitions.size
> res18: Int = 23
>
>
> [kris@airisdata195 ~]$ hdfs dfs -ls /tmp/df
> Found 201 items
> -rw-r--r--   3 kris supergroup  0 2016-12-22 11:01 /tmp/df/_SUCCESS
> -rw-r--r--   3 kris supergroup   4974 2016-12-22 11:01
> /tmp/df/part-r-0-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
> -rw-r--r--   3 kris supergroup   4914 2016-12-22 11:01
> /tmp/df/part-r-1-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
> .
> . (omitted output)
> .
> -rw-r--r--   3 kris supergroup   4893 2016-12-22 11:01
> /tmp/df/part-r-00198-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
> -rw-r--r--   3 kris supergroup   4981 2016-12-22 11:01
> /tmp/df/part-r-00199-84584688-612f-49a3-a023-4a5c6d784d96.snappy.parquet
>
>


Re: Few questions on reliability of accumulators value.

2016-12-12 Thread Daniel Siegmann
Accumulators are generally unreliable and should not be used. The answer to
(2) and (4) is yes. The answer to (3) is both.

Here's a more in-depth explanation:
http://imranrashid.com/posts/Spark-Accumulators/

On Sun, Dec 11, 2016 at 11:27 AM, Sudev A C  wrote:

> Please help.
> Anyone, any thoughts on the previous mail ?
>
> Thanks
> Sudev
>
>
> On Fri, Dec 9, 2016 at 2:28 PM Sudev A C  wrote:
>
>> Hi,
>>
>> Can anyone please help clarity on how accumulators can be used reliably
>> to measure error/success/analytical metrics ?
>>
>> Given below is use case / code snippet that I have.
>>
>> val amtZero = sc.accumulator(0)
>> val amtLarge = sc.accumulator(0)
>> val amtNormal = sc.accumulator(0)
>> val getAmount = (x: org.apache.spark.sql.Row) => if (x.isNullAt(somePos))
>> {
>>   amtZero.add(1)
>>   0.0
>> } else {
>>   val amount = x.getDouble(4)
>>   if (amount > 1) amtLarge.add(1) else amtNormal.add(1)
>>   amount
>> }
>> mrdd = rdd.map(s => (s, getAmount(s)))
>> mrdd.cache()
>> another_mrdd = rdd2.map(s => (s, getAmount(s)))
>> mrdd.save_to_redshift
>> another_mrdd.save_to_redshift
>> mrdd.union(another_mrdd).map().groupByKey().save_to_redshift
>>
>>
>>
>> // Get values from accumulators and persist it to a reliable store for
>> analytics.
>> save_to_datastore(amtZero.value, amtLarge.value, amtNormal.value)
>>
>>
>>
>> Few questions :
>>
>> 1. How many times should I expect the counts for items within mrdd and
>> another_mrdd since both of these rdd's are being reused ? What happens when
>> a part of DAG is skipped due to caching in between (say I'm caching
>> only mrdd)?
>>
>> 2. Should I be worried about any possible stage/task failures (due to
>> master-wroker network issues/resource-starvation/speculative-execution),
>> can these events lead to wrong counts ?
>>
>> 3. Document says  **In transformations, users should be aware of that
>> each task’s update may be applied more than once if tasks or job stages are
>> re-executed.**
>> Here re-execution of stages/tasks is referring to failure re-executions
>> or re-execution due to stage/tasks position in DAG ?
>>
>> 4. Is it safe to say that usage of accumulators(for exact counts) are
>> limited to .foreach() as actions guarantee exactly once updates ?
>>
>> Thanks
>> Sudev
>>
>>
>>
>>


Re: CSV to parquet preserving partitioning

2016-11-15 Thread Daniel Siegmann
Did you try unioning the datasets for each CSV into a single dataset? You
may need to put the directory name into a column so you can partition by it.

On Tue, Nov 15, 2016 at 8:44 AM, benoitdr 
wrote:

> Hello,
>
> I'm trying to convert a bunch of csv files to parquet, with the interesting
> case that the input csv files are already "partitioned" by directory.
> All the input files have the same set of columns.
> The input files structure looks like :
>
> /path/dir1/file1.csv
> /path/dir1/file2.csv
> /path/dir2/file3.csv
> /path/dir3/file4.csv
> /path/dir3/file5.csv
> /path/dir3/file6.csv
>
> I'd like to read those files and write their data to a parquet table in
> hdfs, preserving the partitioning (partitioned by input directory), and
> such
> as there is a single output file per partition.
> The output files strucutre should look like :
>
> hdfs://path/dir=dir1/part-r-xxx.gz.parquet
> hdfs://path/dir=dir2/part-r-yyy.gz.parquet
> hdfs://path/dir=dir3/part-r-zzz.gz.parquet
>
>
> The best solution I have found so far is to loop among the input
> directories, loading the csv files in a dataframe and to write the
> dataframe
> in the target partition.
> But this not efficient since I want a single output file per partition, the
> writing to hdfs is a single tasks that blocks the loop.
> I wonder how to achieve this with a maximum of parallelism (and without
> shuffling the data in the cluster).
>
> Thanks !
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Quirk in how Spark DF handles JSON input records?

2016-11-02 Thread Daniel Siegmann
Yes, it needs to be on a single line. Spark (or Hadoop really) treats
newlines as a record separator by default. While it is possible to use a
different string as a record separator, what would you use in the case of
JSON?

If you do some Googling I suspect you'll find some possible solutions.
Personally, I would just use a separate JSON library (e.g. json4s) to parse
this metadata into an object, rather than trying to read it in through
Spark.

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: UseCase_Design_Help

2016-10-05 Thread Daniel Siegmann
I think it's fine to read animal types locally because there are only 70 of
them. It's just that you want to execute the Spark actions in parallel. The
easiest way to do that is to have only a single action.

Instead of grabbing the result right away, I would just add a column for
the animal type and union the datasets for the animal types. Something like
this (not sure if the syntax is correct):

val animalCounts: DataFrame = animalTypes.map { anmtyp =>
sqlContext.sql("select lit("+anmtyp+") as animal_type,
count(distinct("+anmtyp+")) from TEST1 ")
}.reduce(_.union(_))

animalCounts.foreach( /* print the output */ )

On Wed, Oct 5, 2016 at 12:42 AM, Daniel  wrote:

> First of all, if you want to read a txt file in Spark, you should use
> sc.textFile, because you are using "Source.fromFile", so you are reading it
> with Scala standard api, so it will be read sequentially.
>
> Furthermore you are going to need create a schema if you want to use
> dataframes.
>
> El 5/10/2016 1:53, "Ajay Chander"  escribió:
>
>> Right now, I am doing it like below,
>>
>> import scala.io.Source
>>
>> val animalsFile = "/home/ajay/dataset/animal_types.txt"
>> val animalTypes = Source.fromFile(animalsFile).getLines.toArray
>>
>> for ( anmtyp <- animalTypes ) {
>>   val distinctAnmTypCount = sqlContext.sql("select
>> count(distinct("+anmtyp+")) from TEST1 ")
>>   println("Calculating Metrics for Animal Type: "+anmtyp)
>>   if( distinctAnmTypCount.head().getAs[Long](0) <= 10 ){
>> println("Animal Type: "+anmtyp+" has <= 10 distinct values")
>>   } else {
>> println("Animal Type: "+anmtyp+" has > 10 distinct values")
>>   }
>> }
>>
>> But the problem is it is running sequentially.
>>
>> Any inputs are appreciated. Thank you.
>>
>>
>> Regards,
>> Ajay
>>
>>
>> On Tue, Oct 4, 2016 at 7:44 PM, Ajay Chander  wrote:
>>
>>> Hi Everyone,
>>>
>>> I have a use-case where I have two Dataframes like below,
>>>
>>> 1) First Dataframe(DF1) contains,
>>>
>>> *ANIMALS*
>>> Mammals
>>> Birds
>>> Fish
>>> Reptiles
>>> Amphibians
>>>
>>> 2) Second Dataframe(DF2) contains,
>>>
>>> *ID, Mammals, Birds, Fish, Reptiles, Amphibians*
>>> 1,  Dogs,  Eagle,  Goldfish,  NULL,  Frog
>>> 2,  Cats,  Peacock,  Guppy, Turtle,  Salamander
>>> 3,  Dolphins,  Eagle,  Zander,  NULL,  Frog
>>> 4,  Whales,  Parrot,  Guppy,  Snake,  Frog
>>> 5,  Horses,  Owl,  Guppy,  Snake,  Frog
>>> 6,  Dolphins,  Kingfisher,  Zander,  Turtle,  Frog
>>> 7,  Dogs,  Sparrow,  Goldfish,  NULL,  Salamander
>>>
>>> Now I want to take each row from DF1 and find out its distinct count in
>>> DF2. Example, pick Mammals from DF1 then find out count(distinct(Mammals))
>>> from DF2 i.e. 5
>>>
>>> DF1 has 70 distinct rows/Animal types
>>> DF2 has some million rows
>>>
>>> Whats the best way to achieve this efficiently using parallelism ?
>>>
>>> Any inputs are helpful. Thank you.
>>>
>>> Regards,
>>> Ajay
>>>
>>>
>>


Re: Access S3 buckets in multiple accounts

2016-09-28 Thread Daniel Siegmann
Thanks for the help everyone. I was able to get permissions configured for
my cluster so it now has access to the bucket on the other account.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Wed, Sep 28, 2016 at 10:03 AM, Steve Loughran 
wrote:

>
> On 27 Sep 2016, at 15:53, Daniel Siegmann 
> wrote:
>
> I am running Spark on Amazon EMR and writing data to an S3 bucket.
> However, the data is read from an S3 bucket in a separate AWS account.
> Setting the fs.s3a.access.key and fs.s3a.secret.key values is sufficient to
> get access to the other account (using the s3a protocol), however I then
> won't have access to the S3 bucket in the EMR cluster's AWS account.
>
> Is there any way for Spark to access S3 buckets in multiple accounts? If
> not, is there any best practice for how to work around this?
>
>
>
> There are 2 ways to do this without changing permissions
>
> 1. different implementations: use s3a for one, s3n for the other, give
> them the different secrets
>
> 2. insecure: use the secrets in the URI. s3a://AWSID:escaped-secret@
> bucket/path
> -leaks your secrets thoughout the logs, has problems with "/" in the
> password..if there is one, you'll probably need to regenerate the password.
>
> This is going to have to be fixed in the s3a implementation at some point,
> as it's not only needed for cross user auth, once you switch to v4 AWS auth
> you need to specify the appropriate s3 endpoint for your region; you can't
> just use s3 central, but need to choose s3 frankfurt, s3 seoul, etc: so
> won't be able to work with data across regions.
>


Access S3 buckets in multiple accounts

2016-09-27 Thread Daniel Siegmann
I am running Spark on Amazon EMR and writing data to an S3 bucket. However,
the data is read from an S3 bucket in a separate AWS account. Setting the
fs.s3a.access.key and fs.s3a.secret.key values is sufficient to get access
to the other account (using the s3a protocol), however I then won't have
access to the S3 bucket in the EMR cluster's AWS account.

Is there any way for Spark to access S3 buckets in multiple accounts? If
not, is there any best practice for how to work around this?

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Dataset encoder for java.time.LocalDate?

2016-09-02 Thread Daniel Siegmann
It seems Spark can handle case classes with java.sql.Date, but not
java.time.LocalDate. It complains there's no encoder.

Are there any plans to add an encoder for LocalDate (and other classes in
the new Java 8 Time and Date API), or is there an existing library I can
use that provides encoders?

--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


Re: What are using Spark for

2016-08-02 Thread Daniel Siegmann
Yes, you can use Spark for ETL, as well as feature engineering, training,
and scoring.

~Daniel Siegmann

On Tue, Aug 2, 2016 at 3:29 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> If I may say, if you spend  sometime going through this mailing list in
> this forum and see the variety of topics that users are discussing, then
> you may get plenty of ideas about Spark application in real life..
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 2 August 2016 at 19:17, Karthik Ramakrishnan <
> karthik.ramakrishna...@gmail.com> wrote:
>
>> We used Storm for ETL, now currently thinking Spark might be advantageous
>> since some ML also is coming our way.
>>
>> - Karthik
>>
>> On Tue, Aug 2, 2016 at 1:10 PM, Rohit L  wrote:
>>
>>> Does anyone use Spark for ETL?
>>>
>>> On Tue, Aug 2, 2016 at 1:24 PM, Sonal Goyal 
>>> wrote:
>>>
>>>> Hi Rohit,
>>>>
>>>> You can check the powered by spark page for some real usage of Spark.
>>>>
>>>> https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark
>>>>
>>>>
>>>> On Tuesday, August 2, 2016, Rohit L  wrote:
>>>>
>>>>> Hi Everyone,
>>>>>
>>>>>   I want to know the real world uses cases for which Spark is used
>>>>> and hence can you please share for what purpose you are using Apache Spark
>>>>> in your project?
>>>>>
>>>>> --
>>>>> Rohit
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Sonal
>>>> Founder, Nube Technologies <http://www.nubetech.co>
>>>> Reifier at Strata Hadoop World
>>>> <https://www.youtube.com/watch?v=eD3LkpPQIgM>
>>>> Reifier at Spark Summit 2015
>>>> <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/>
>>>>
>>>> <http://in.linkedin.com/in/sonalgoyal>
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>>
>>
>


Re: Apache design patterns

2016-06-09 Thread Daniel Siegmann
On Tue, Jun 7, 2016 at 11:43 PM, Francois Le Roux 
wrote:

> 1.   Should I use dataframes to ‘pull the source data? If so, do I do
> a groupby and order by as part of the SQL query?
>
Seems reasonable. If you use Scala you might want to define a case class
and convert the data frame to a dataset (I find the API of the latter
easier to work with), either before or after you group.


> 2.   How do I then split the grouped data (i.e. driver ID key value
> pairs) to then be parallelized for concurrent processing (i.e. ideally the
> number of parallel datasets/grouped data should run at max node cluster
> capacity)? DO I need to do some sort of mappartitioning ?
>
Spark partitions the data. Each partition can be processed in parallel (if
you look at the list of tasks for a stage in the UI, each task is a single
partition).

The problem you could run into is if you have many records for a given
driver, since they will all end up in the same partition. Look out for that
(you can see in the UI how much data is being processed by each task).

The number of partitions for stages which read the data depend on how the
data is stored (watch out for large gzipped files, as they cannot be
split). When data is shuffled (e.g. for a group by) it will be
repartitioned, with the number of partitions being determined by the
properties spark.default.parallelism and spark.sql.shuffle.partitions.

See
http://spark.apache.org/docs/latest/configuration.html#execution-behavior
and
http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options
.


> 3.   Pending (1) & (2) answers: How does each (i.e. grouped data set)
> dataframe or RDD or dataset perform these rules based checks (i.e. backward
> and forward looking checks) ? i.e. how is this achieved in SPARK?
>
Once you've grouped by driver ID, I assume you'll want to use a map
function. If you want to output multiple records (un-grouping after you've
processed them), you can use flatMap.

You might want to verify your grouped data is in row ID order. You need to
be careful about assuming things are ordered in a particular way in a
distributed system.


Re: Saving Parquet files to S3

2016-06-09 Thread Daniel Siegmann
I don't believe there's anyway to output files of a specific size. What you
can do is partition your data into a number of partitions such that the
amount of data they each contain is around 1 GB.

On Thu, Jun 9, 2016 at 7:51 AM, Ankur Jain  wrote:

> Hello Team,
>
>
>
> I want to write parquet files to AWS S3, but I want to size each file size
> to 1 GB.
>
> Can someone please guide me on how I can achieve the same?
>
>
>
> I am using AWS EMR with spark 1.6.1.
>
>
>
> Thanks,
>
> Ankur
> Information transmitted by this e-mail is proprietary to YASH Technologies
> and/ or its Customers and is intended for use only by the individual or
> entity to which it is addressed, and may contain information that is
> privileged, confidential or exempt from disclosure under applicable law. If
> you are not the intended recipient or it appears that this mail has been
> forwarded to you without proper authority, you are notified that any use or
> dissemination of this information in any manner is strictly prohibited. In
> such cases, please notify us immediately at i...@yash.com and delete this
> mail from your records.
>


Re: [ML] Training with bias

2016-04-12 Thread Daniel Siegmann
Yes, that's what I was looking for. Thanks.

On Tue, Apr 12, 2016 at 9:28 AM, Nick Pentreath 
wrote:

> Are you referring to fitting the intercept term? You can use
> lr.setFitIntercept (though it is true by default):
>
> scala> lr.explainParam(lr.fitIntercept)
> res27: String = fitIntercept: whether to fit an intercept term (default:
> true)
>
> On Mon, 11 Apr 2016 at 21:59 Daniel Siegmann 
> wrote:
>
>> I'm trying to understand how I can add a bias when training in Spark. I
>> have only a vague familiarity with this subject, so I hope this question
>> will be clear enough.
>>
>> Using liblinear a bias can be set - if it's >= 0, there will be an
>> additional weight appended in the model, and predicting with that model
>> will automatically append a feature for the bias.
>>
>> Is there anything similar in Spark, such as for logistic regression? The
>> closest thing I can find is MLUtils.appendBias, but this seems to
>> require manual work on both the training and scoring side. I was hoping for
>> something that would just be part of the model.
>>
>>
>> ~Daniel Siegmann
>>
>


[ML] Training with bias

2016-04-11 Thread Daniel Siegmann
I'm trying to understand how I can add a bias when training in Spark. I
have only a vague familiarity with this subject, so I hope this question
will be clear enough.

Using liblinear a bias can be set - if it's >= 0, there will be an
additional weight appended in the model, and predicting with that model
will automatically append a feature for the bias.

Is there anything similar in Spark, such as for logistic regression? The
closest thing I can find is MLUtils.appendBias, but this seems to require
manual work on both the training and scoring side. I was hoping for
something that would just be part of the model.

~Daniel Siegmann


Re: cluster randomly re-starting jobs

2016-03-21 Thread Daniel Siegmann
Never used Ambari and I don't know if this is your problem, but I have seen
similar behavior. In my case, my application failed and Hadoop kicked off a
second attempt. I didn't realize this, but when I refreshed the Spark UI,
suddenly everything seemed reset! This is because the application ID is
part of the URL, but not the attempt ID, so when the context for the second
attempt starts it will be at the same URL as the context for the first job.

To verify if this is the problem you could look at the application in the
Hadoop console (or whatever the equivalent is on Ambari) and see if there
are multiple attempts. You can also see it in the Spark history server
(under incomplete applications, if the second attempt is still running).

~Daniel Siegmann

On Mon, Mar 21, 2016 at 9:58 AM, Ted Yu  wrote:

> Can you provide a bit more information ?
>
> Release of Spark and YARN
>
> Have you checked Spark UI / YARN job log to see if there is some clue ?
>
> Cheers
>
> On Mon, Mar 21, 2016 at 6:21 AM, Roberto Pagliari <
> roberto.pagli...@asos.com> wrote:
>
>> I noticed that sometimes the spark cluster seems to restart the job
>> completely.
>>
>> In the Ambari UI (where I can check jobs/stages) everything that was done
>> up to a certain point is removed, and the job is restarted.
>>
>> Does anyone know what the issue could be?
>>
>> Thank you,
>>
>>
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Daniel Siegmann
Thanks for the pointer to those indexers, those are some good examples. A
good way to go for the trainer and any scoring done in Spark. I will
definitely have to deal with scoring in non-Spark systems though.

I think I will need to scale up beyond what single-node liblinear can
practically provide. The system will need to handle much larger sub-samples
of this data (and other projects might be larger still). Additionally, the
system needs to train many models in parallel (hyper-parameter optimization
with n-fold cross-validation, multiple algorithms, different sets of
features).

Still, I suppose we'll have to consider whether Spark is the best system
for this. For now though, my job is to see what can be achieved with Spark.



On Fri, Mar 11, 2016 at 12:45 PM, Nick Pentreath 
wrote:

> Ok, I think I understand things better now.
>
> For Spark's current implementation, you would need to map those features
> as you mention. You could also use say StringIndexer -> OneHotEncoder or
> VectorIndexer. You could create a Pipeline to deal with the mapping and
> training (e.g.
> http://spark.apache.org/docs/latest/ml-guide.html#example-pipeline).
> Pipeline supports persistence.
>
> But it depends on your scoring use case too - a Spark pipeline can be
> saved and then reloaded, but you need all of Spark dependencies in your
> serving app which is often not ideal. If you're doing bulk scoring offline,
> then it may suit.
>
> Honestly though, for that data size I'd certainly go with something like
> Liblinear :) Spark will ultimately scale better with # training examples
> for very large scale problems. However there are definitely limitations on
> model dimension and sparse weight vectors currently. There are potential
> solutions to these but they haven't been implemented as yet.
>
> On Fri, 11 Mar 2016 at 18:35 Daniel Siegmann 
> wrote:
>
>> On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath > > wrote:
>>
>>> Would you mind letting us know the # training examples in the datasets?
>>> Also, what do your features look like? Are they text, categorical etc? You
>>> mention that most rows only have a few features, and all rows together have
>>> a few 10,000s features, yet your max feature value is 20 million. How are
>>> your constructing your feature vectors to get a 20 million size? The only
>>> realistic way I can see this situation occurring in practice is with
>>> feature hashing (HashingTF).
>>>
>>
>> The sub-sample I'm currently training on is about 50K rows, so ... small.
>>
>> The features causing this issue are numeric (int) IDs for ... lets call
>> it "Thing". For each Thing in the record, we set the feature Thing.id to
>> a value of 1.0 in our vector (which is of course a SparseVector). I'm
>> not sure how IDs are generated for Things, but they can be large numbers.
>>
>> The largest Thing ID is around 20 million, so that ends up being the size
>> of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
>> this data. The mean number of features per record in what I'm currently
>> training against is 41, while the maximum for any given record was 1754.
>>
>> It is possible to map the features into a small set (just need to
>> zipWithIndex), but this is undesirable because of the added complexity (not
>> just for the training, but also anything wanting to score against the
>> model). It might be a little easier if this could be encapsulated within
>> the model object itself (perhaps via composition), though I'm not sure how
>> feasible that is.
>>
>> But I'd rather not bother with dimensionality reduction at all - since we
>> can train using liblinear in just a few minutes, it doesn't seem necessary.
>>
>>
>>>
>>> MultivariateOnlineSummarizer uses dense arrays, but it should be
>>> possible to enable sparse data. Though in theory, the result will tend to
>>> be dense anyway, unless you have very many entries in the input feature
>>> vector that never occur and are actually zero throughout the data set
>>> (which it seems is the case with your data?). So I doubt whether using
>>> sparse vectors for the summarizer would improve performance in general.
>>>
>>
>> Yes, that is exactly my case - the vast majority of entries in the input
>> feature vector will *never* occur. Presumably that means most of the
>> values in the aggregators' arrays will be zero.
>>
>>
>>>
>>> LR doesn't accept a sparse weight vector, as it uses dense vectors for
>>> coefficients and gradients currently. When using L1 regularization, it
>>> could support sparse weight vectors, but the current implementation doesn't
>>> do that yet.
>>>
>>
>> Good to know it is theoretically possible to implement. I'll have to give
>> it some thought. In the meantime I guess I'll experiment with coalescing
>> the data to minimize the communication overhead.
>>
>> Thanks again.
>>
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Daniel Siegmann
On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath 
wrote:

> Would you mind letting us know the # training examples in the datasets?
> Also, what do your features look like? Are they text, categorical etc? You
> mention that most rows only have a few features, and all rows together have
> a few 10,000s features, yet your max feature value is 20 million. How are
> your constructing your feature vectors to get a 20 million size? The only
> realistic way I can see this situation occurring in practice is with
> feature hashing (HashingTF).
>

The sub-sample I'm currently training on is about 50K rows, so ... small.

The features causing this issue are numeric (int) IDs for ... lets call it
"Thing". For each Thing in the record, we set the feature Thing.id to a
value of 1.0 in our vector (which is of course a SparseVector). I'm not
sure how IDs are generated for Things, but they can be large numbers.

The largest Thing ID is around 20 million, so that ends up being the size
of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
this data. The mean number of features per record in what I'm currently
training against is 41, while the maximum for any given record was 1754.

It is possible to map the features into a small set (just need to
zipWithIndex), but this is undesirable because of the added complexity (not
just for the training, but also anything wanting to score against the
model). It might be a little easier if this could be encapsulated within
the model object itself (perhaps via composition), though I'm not sure how
feasible that is.

But I'd rather not bother with dimensionality reduction at all - since we
can train using liblinear in just a few minutes, it doesn't seem necessary.


>
> MultivariateOnlineSummarizer uses dense arrays, but it should be possible
> to enable sparse data. Though in theory, the result will tend to be dense
> anyway, unless you have very many entries in the input feature vector that
> never occur and are actually zero throughout the data set (which it seems
> is the case with your data?). So I doubt whether using sparse vectors for
> the summarizer would improve performance in general.
>

Yes, that is exactly my case - the vast majority of entries in the input
feature vector will *never* occur. Presumably that means most of the values
in the aggregators' arrays will be zero.


>
> LR doesn't accept a sparse weight vector, as it uses dense vectors for
> coefficients and gradients currently. When using L1 regularization, it
> could support sparse weight vectors, but the current implementation doesn't
> do that yet.
>

Good to know it is theoretically possible to implement. I'll have to give
it some thought. In the meantime I guess I'll experiment with coalescing
the data to minimize the communication overhead.

Thanks again.


Re: Spark ML - Scaling logistic regression for many features

2016-03-10 Thread Daniel Siegmann
Hi Nick,

Thanks for the feedback and the pointers. I tried coalescing to fewer
partitions and improved the situation dramatically. As you suggested, it is
communication overhead dominating the overall runtime.

The training run I mentioned originally had 900 partitions. Each tree
aggregation has two stages, one for the original partitions, and then one
with the aggregation into a smaller number (at 900 partitions the second
stage was 30). The first tree aggregation job (the longer one) uses the
MultivariateOnlineSummarizer you mentioned, while the subsequent
aggregation jobs use LogisticAggregator (similar problem, though smaller).

I've run some tests with fewer partitions on a very similar data set. 400
partitions took 8 hours, 100 partitions took 4 hours, and 10 partitions
took 1.4 hours. I put some screenshots from the Spark UI here:
http://imgur.com/a/trRJU

Still, these numbers seem oddly high. With 10 partitions it's shuffling
only some 200 MB per job, but the median "Getting Result Time" is 2.1
minutes. I would expected it to take *seconds* to transfer that data.

Anyway, the MultivariateOnlineSummarizer creates several arrays of doubles
equal to the size of the vector - arrays of course are inherently dense.
While this is only one iteration it is the longest, taking a significant
portion of the time by itself. LogisticAggregator meanwhile has fewer
arrays, but if you try to pass coefficients as anything other than a dense
vector it actually throws an error! Any idea why? Anyone know a reason
these aggregators *must* store their data densely, or is just an
implementation choice? Perhaps refactoring these classes to store data
sparsely would fix the issue.

On Wed, Mar 9, 2016 at 7:57 AM, Nick Pentreath 
wrote:

> Hi Daniel
>
> The bottleneck in Spark ML is most likely (a) the fact that the weight
> vector itself is dense, and (b) the related communication via the driver. A
> tree aggregation mechanism is used for computing gradient sums (see
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala#L1080
>  and
> https://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html),
> which helps efficiency, but ultimately the driver must collect that
> gradient vector and re-broadcast the updated weight vector on every
> iteration.
>
> From a quick glance, MultivariateOnlineSummarizer doesn't seem optimized
> to sparse data either (though that is only one pass of the dataset so
> doubtful it adds too much overhead).
>
> It would be helpful to understand some further details:
> 1. Some more exact timing numbers - if you could provide screenshots /
> output from the UI to indicate the stages and call sites where the time is
> being spent that would be really useful
> 2. Is this part of a pipeline, and if so, what is the contribution of
> other parts of that pipeline to overall runtime?
> 3. Some stats on input / output data sizes from the critical stages (again
> from the UI)
> 4. The dataset size (# examples, avg sparsity % per example, etc)
> 5. Related to (4), the number of partitions of your dataset
> 6. Cluster details (# nodes and spec), as well as Spark version
>
> If you have a lot of partitions, you could find performance will be better
> with fewer partitions because the communication overhead will tend to
> dominate the overall runtime.
>
> Still, 10 hours and >100GB of driver memory seems extreme for a 20 million
> size dense weight vector (which should only be a few 100MB memory), so
> perhaps something else is going on.
>
> Nick
>
> On Tue, 8 Mar 2016 at 22:55 Daniel Siegmann 
> wrote:
>
>> Just for the heck of it I tried the old MLlib implementation, but it had
>> the same scalability problem.
>>
>> Anyone familiar with the logistic regression implementation who could
>> weigh in?
>>
>> On Mon, Mar 7, 2016 at 5:35 PM, Michał Zieliński <
>> zielinski.mich...@gmail.com> wrote:
>>
>>> We're using SparseVector columns in a DataFrame, so they are definitely
>>> supported. But maybe for LR some implicit magic is happening inside.
>>>
>>> On 7 March 2016 at 23:04, Devin Jones  wrote:
>>>
>>>> I could be wrong but its possible that toDF populates a dataframe which
>>>> I understand do not support sparsevectors at the moment.
>>>>
>>>> If you use the MlLib logistic regression implementation (not ml) you
>>>> can pass the RDD[LabeledPoint] data type directly to the learner.
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>>>>
>>>> Only downside

Re: Spark ML - Scaling logistic regression for many features

2016-03-08 Thread Daniel Siegmann
Just for the heck of it I tried the old MLlib implementation, but it had
the same scalability problem.

Anyone familiar with the logistic regression implementation who could weigh
in?

On Mon, Mar 7, 2016 at 5:35 PM, Michał Zieliński <
zielinski.mich...@gmail.com> wrote:

> We're using SparseVector columns in a DataFrame, so they are definitely
> supported. But maybe for LR some implicit magic is happening inside.
>
> On 7 March 2016 at 23:04, Devin Jones  wrote:
>
>> I could be wrong but its possible that toDF populates a dataframe which I
>> understand do not support sparsevectors at the moment.
>>
>> If you use the MlLib logistic regression implementation (not ml) you can
>> pass the RDD[LabeledPoint] data type directly to the learner.
>>
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
>>
>> Only downside is that you can't use the pipeline framework from spark ml.
>>
>> Cheers,
>> Devin
>>
>>
>>
>> On Mon, Mar 7, 2016 at 4:54 PM, Daniel Siegmann <
>> daniel.siegm...@teamaol.com> wrote:
>>
>>> Yes, it is a SparseVector. Most rows only have a few features, and all
>>> the rows together only have tens of thousands of features, but the vector
>>> size is ~ 20 million because that is the largest feature.
>>>
>>> On Mon, Mar 7, 2016 at 4:31 PM, Devin Jones 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Which data structure are you using to train the model? If you haven't
>>>> tried yet, you should consider the SparseVector
>>>>
>>>>
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector
>>>>
>>>>
>>>> On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann <
>>>> daniel.siegm...@teamaol.com> wrote:
>>>>
>>>>> I recently tried to a model using
>>>>> org.apache.spark.ml.classification.LogisticRegression on a data set
>>>>> where the feature vector size was around ~20 million. It did *not* go
>>>>> well. It took around 10 hours to train on a substantial cluster.
>>>>> Additionally, it pulled a lot data back to the driver - I eventually set 
>>>>> --conf
>>>>> spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
>>>>> submitting.
>>>>>
>>>>> Attempting the same application on the same cluster with the feature
>>>>> vector size reduced to 100k took only ~ 9 minutes. Clearly there is an
>>>>> issue with scaling to large numbers of features. I'm not doing anything
>>>>> fancy in my app, here's the relevant code:
>>>>>
>>>>> val lr = new LogisticRegression().setRegParam(1)
>>>>> val model = lr.fit(trainingSet.toDF())
>>>>>
>>>>> In comparison, a coworker trained a logistic regression model on her
>>>>> *laptop* using the Java library liblinear in just a few minutes.
>>>>> That's with the ~20 million-sized feature vectors. This suggests to me
>>>>> there is some issue with Spark ML's implementation of logistic regression
>>>>> which is limiting its scalability.
>>>>>
>>>>> Note that my feature vectors are *very* sparse. The maximum feature
>>>>> is around 20 million, but I think there are only 10's of thousands of
>>>>> features.
>>>>>
>>>>> Has anyone run into this? Any idea where the bottleneck is or how this
>>>>> problem might be solved?
>>>>>
>>>>> One solution of course is to implement some dimensionality reduction.
>>>>> I'd really like to avoid this, as it's just another thing to deal with -
>>>>> not so hard to put it into the trainer, but then anything doing scoring
>>>>> will need the same logic. Unless Spark ML supports this out of the box? An
>>>>> easy way to save / load a model along with the dimensionality reduction
>>>>> logic so when transform is called on the model it will handle the
>>>>> dimensionality reduction transparently?
>>>>>
>>>>> Any advice would be appreciated.
>>>>>
>>>>> ~Daniel Siegmann
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Daniel Siegmann
Yes, it is a SparseVector. Most rows only have a few features, and all the
rows together only have tens of thousands of features, but the vector size
is ~ 20 million because that is the largest feature.

On Mon, Mar 7, 2016 at 4:31 PM, Devin Jones 
wrote:

> Hi,
>
> Which data structure are you using to train the model? If you haven't
> tried yet, you should consider the SparseVector
>
>
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.SparseVector
>
>
> On Mon, Mar 7, 2016 at 4:03 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> I recently tried to a model using
>> org.apache.spark.ml.classification.LogisticRegression on a data set
>> where the feature vector size was around ~20 million. It did *not* go
>> well. It took around 10 hours to train on a substantial cluster.
>> Additionally, it pulled a lot data back to the driver - I eventually set 
>> --conf
>> spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
>> submitting.
>>
>> Attempting the same application on the same cluster with the feature
>> vector size reduced to 100k took only ~ 9 minutes. Clearly there is an
>> issue with scaling to large numbers of features. I'm not doing anything
>> fancy in my app, here's the relevant code:
>>
>> val lr = new LogisticRegression().setRegParam(1)
>> val model = lr.fit(trainingSet.toDF())
>>
>> In comparison, a coworker trained a logistic regression model on her
>> *laptop* using the Java library liblinear in just a few minutes. That's
>> with the ~20 million-sized feature vectors. This suggests to me there is
>> some issue with Spark ML's implementation of logistic regression which is
>> limiting its scalability.
>>
>> Note that my feature vectors are *very* sparse. The maximum feature is
>> around 20 million, but I think there are only 10's of thousands of features.
>>
>> Has anyone run into this? Any idea where the bottleneck is or how this
>> problem might be solved?
>>
>> One solution of course is to implement some dimensionality reduction. I'd
>> really like to avoid this, as it's just another thing to deal with - not so
>> hard to put it into the trainer, but then anything doing scoring will need
>> the same logic. Unless Spark ML supports this out of the box? An easy way
>> to save / load a model along with the dimensionality reduction logic so
>> when transform is called on the model it will handle the dimensionality
>> reduction transparently?
>>
>> Any advice would be appreciated.
>>
>> ~Daniel Siegmann
>>
>
>


Spark ML - Scaling logistic regression for many features

2016-03-07 Thread Daniel Siegmann
I recently tried to a model using
org.apache.spark.ml.classification.LogisticRegression on a data set where
the feature vector size was around ~20 million. It did *not* go well. It
took around 10 hours to train on a substantial cluster. Additionally, it
pulled a lot data back to the driver - I eventually set --conf
spark.driver.memory=128g --conf spark.driver.maxResultSize=112g when
submitting.

Attempting the same application on the same cluster with the feature vector
size reduced to 100k took only ~ 9 minutes. Clearly there is an issue with
scaling to large numbers of features. I'm not doing anything fancy in my
app, here's the relevant code:

val lr = new LogisticRegression().setRegParam(1)
val model = lr.fit(trainingSet.toDF())

In comparison, a coworker trained a logistic regression model on her
*laptop* using the Java library liblinear in just a few minutes. That's
with the ~20 million-sized feature vectors. This suggests to me there is
some issue with Spark ML's implementation of logistic regression which is
limiting its scalability.

Note that my feature vectors are *very* sparse. The maximum feature is
around 20 million, but I think there are only 10's of thousands of features.

Has anyone run into this? Any idea where the bottleneck is or how this
problem might be solved?

One solution of course is to implement some dimensionality reduction. I'd
really like to avoid this, as it's just another thing to deal with - not so
hard to put it into the trainer, but then anything doing scoring will need
the same logic. Unless Spark ML supports this out of the box? An easy way
to save / load a model along with the dimensionality reduction logic so
when transform is called on the model it will handle the dimensionality
reduction transparently?

Any advice would be appreciated.

~Daniel Siegmann


Re: Serializing collections in Datasets

2016-03-03 Thread Daniel Siegmann
I have confirmed this is fixed in Spark 1.6.1 RC 1. Thanks.

On Tue, Feb 23, 2016 at 1:32 PM, Daniel Siegmann <
daniel.siegm...@teamaol.com> wrote:

> Yes, I will test once 1.6.1 RC1 is released. Thanks.
>
> On Mon, Feb 22, 2016 at 6:24 PM, Michael Armbrust 
> wrote:
>
>> I think this will be fixed in 1.6.1.  Can you test when we post the first
>> RC? (hopefully later today)
>>
>> On Mon, Feb 22, 2016 at 1:51 PM, Daniel Siegmann <
>> daniel.siegm...@teamaol.com> wrote:
>>
>>> Experimenting with datasets in Spark 1.6.0 I ran into a serialization
>>> error when using case classes containing a Seq member. There is no
>>> problem when using Array instead. Nor is there a problem using RDD or
>>> DataFrame (even if converting the DF to a DS later).
>>>
>>> Here's an example you can test in the Spark shell:
>>>
>>> import sqlContext.implicits._
>>>
>>> case class SeqThing(id: String, stuff: Seq[Int])
>>> val seqThings = Seq(SeqThing("A", Seq()))
>>> val seqData = sc.parallelize(seqThings)
>>>
>>> case class ArrayThing(id: String, stuff: Array[Int])
>>> val arrayThings = Seq(ArrayThing("A", Array()))
>>> val arrayData = sc.parallelize(arrayThings)
>>>
>>>
>>> // Array works fine
>>> arrayData.collect()
>>> arrayData.toDF.as[ArrayThing]
>>> arrayData.toDS
>>>
>>> // Seq can't convert directly to DS
>>> seqData.collect()
>>> seqData.toDF.as[SeqThing]
>>> seqData.toDS // Serialization exception
>>>
>>> Is this working as intended? Are there plans to support serializing
>>> arbitrary Seq values in datasets, or must everything be converted to
>>> Array?
>>>
>>> ~Daniel Siegmann
>>>
>>
>>
>


Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-02 Thread Daniel Siegmann
In the past I have seen this happen when I filled up HDFS and some core
nodes became unhealthy. There was no longer anywhere to replicate the data.
>From your command it looks like you should have 1 master and 2 core nodes
in your cluster. Can you verify both the core nodes are healthy?

On Wed, Mar 2, 2016 at 6:01 AM, Oleg Ruchovets  wrote:

> Here is my command:
>aws emr create-cluster --release-label emr-4.3.0 --name "ClusterJava8"
> --use-default-roles   --applications  Name=Ganglia Name=Hive Name=Hue
> Name=Mahout Name=Pig  Name=Spark  --ec2-attributes KeyName=CC-ES-Demo
>  --instance-count 3 --instance-type m3.xlarge  --use-default-roles
> --bootstrap-action Path=s3://crayon-emr-scripts/emr_java_8.sh
>
> I am using bootstrap script to install java 8.
>
> When I choose applications (Name=Ganglia Name=Hive Name=Hue Name=Mahout
> Name=Pig  Name=Spark) problem is gone. I fixed on the way Lzo not found
> exception. Now I have another problem that I have no idea why it happens:
> I tries to copy file to hdfs and got this exception (file is very small ,
> just couple of kb).
>
>
>
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /input/test.txt._COPYING_ could only be replicated to 0 nodes instead of
> minReplication (=1).  There are 0 datanode(s) running and no node(s) are
> excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1476)
> at org.apache.hadoop.ipc.Client.call(Client.java:1407)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:238)
> at com.sun.proxy.$Proxy9.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1441)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454)
> put: File /input/test.txt._COPYING_ could only be replicated to 0 nodes
> instead of minReplication (=1).  There are 0 datanode(s) running and no
> node(s) are excluded in this operation.
>
>
> On Wed, Mar 2, 2016 at 4:09 AM, Gourav Sengupta  > wrote:
>
>> Hi,
>>
>> which region are you using the EMR clusters from? Is there any tweaking
>> of the HADOOP parameters that you are doing before starting the clusters?
>>
>> If you are using AWS CLI to start the cluster just send across the
>> command.
>>
>> I have, never till date, faced any such issues in the Ireland region.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Mar 1, 2016 at 9:15 AM, Oleg Ruchovets 
>> wrote:
>>
>>> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell
>>> but it looks it does't work and throws exceptions.
>>> Please advice:
>>>
>>> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
>>> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
>>> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
>>> support was removed in 8.0
>>> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
>>> 16/0

Re: EMR 4.3.0 spark 1.6 shell problem

2016-03-01 Thread Daniel Siegmann
How many core nodes does your cluster have?

On Tue, Mar 1, 2016 at 4:15 AM, Oleg Ruchovets  wrote:

> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell but
> it looks it does't work and throws exceptions.
> Please advice:
>
> [hadoop@ip-172-31-39-37 conf]$ cd  /usr/bin/
> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M;
> support was removed in 8.0
> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server
> 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class
> server' on port 47223.
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 1.6.0
>   /_/
>
> Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71)
> Type in expressions to have them evaluated.
> Type :help for more information.
> 16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0
> 16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop
> 16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(hadoop); users
> with modify permissions: Set(hadoop)
> 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver'
> on port 52143.
> 16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started
> 16/03/01 09:11:54 INFO Remoting: Starting remoting
> 16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989]
> 16/03/01 09:11:54 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 42989.
> 16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker
> 16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster
> 16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at
> /mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc
> 16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity
> 518.1 MB
> 16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on
> port 4040.
> 16/03/01 09:11:55 INFO SparkUI: Started SparkUI at
> http://172.31.39.37:4040
> 16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at /
> 172.31.39.37:8032
> 16/03/01 09:11:55 INFO Client: Requesting a new application from cluster
> with 2 NodeManagers
> 16/03/01 09:11:55 INFO Client: Verifying our application has not requested
> more than the maximum memory capability of the cluster (11520 MB per
> container)
> 16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB
> memory including 384 MB overhead
> 16/03/01 09:11:55 INFO Client: Setting up container launch context for our
> AM
> 16/03/01 09:11:55 INFO Client: Setting up the launch environment for our
> AM container
> 16/03/01 09:11:55 INFO Client: Preparing resources for our AM container
> 16/03/01 09:11:56 INFO Client: Uploading resource
> file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar ->
> hdfs://
> 172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> 16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord
> disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec:
> 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500
> lastModified: 1456818856695
> 16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver
> j-2FT6QNFSPTHNX:i-5f6bcadb:SparkSubmit:04807 period:60
> /mnt/var/em/raw/i-5f6bcadb_20160301_SparkSubmit_04807_raw.bin
> 16/03/01 09:11:56 WARN DFSClient: DataStreamer Exception
> org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
> /user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar
> could only be replicated to 0 nodes instead of minReplication (=1).  There
> are 0 datanode(s) running and no node(s) are excluded in this operation.
> at
> org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
> at

Re: Serializing collections in Datasets

2016-02-23 Thread Daniel Siegmann
Yes, I will test once 1.6.1 RC1 is released. Thanks.

On Mon, Feb 22, 2016 at 6:24 PM, Michael Armbrust 
wrote:

> I think this will be fixed in 1.6.1.  Can you test when we post the first
> RC? (hopefully later today)
>
> On Mon, Feb 22, 2016 at 1:51 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> Experimenting with datasets in Spark 1.6.0 I ran into a serialization
>> error when using case classes containing a Seq member. There is no
>> problem when using Array instead. Nor is there a problem using RDD or
>> DataFrame (even if converting the DF to a DS later).
>>
>> Here's an example you can test in the Spark shell:
>>
>> import sqlContext.implicits._
>>
>> case class SeqThing(id: String, stuff: Seq[Int])
>> val seqThings = Seq(SeqThing("A", Seq()))
>> val seqData = sc.parallelize(seqThings)
>>
>> case class ArrayThing(id: String, stuff: Array[Int])
>> val arrayThings = Seq(ArrayThing("A", Array()))
>> val arrayData = sc.parallelize(arrayThings)
>>
>>
>> // Array works fine
>> arrayData.collect()
>> arrayData.toDF.as[ArrayThing]
>> arrayData.toDS
>>
>> // Seq can't convert directly to DS
>> seqData.collect()
>> seqData.toDF.as[SeqThing]
>> seqData.toDS // Serialization exception
>>
>> Is this working as intended? Are there plans to support serializing
>> arbitrary Seq values in datasets, or must everything be converted to
>> Array?
>>
>> ~Daniel Siegmann
>>
>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Daniel Siegmann
During testing you will typically be using some finite data. You want the
stream to shut down automatically when that data has been consumed so your
test shuts down gracefully.

Of course once the code is running in production you'll want it to keep
waiting for new records. So whether the stream shuts down when there's no
more data should be configurable.



On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
wrote:

> Just out of curiosity I will like to know why a streaming program should
> shutdown when no new data is arriving?  I think it should keep waiting for
> arrival of new records.
>
> Thanks
> Ashutosh
>
> On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
> wrote:
>
>> A guess - parseRecord is returning None in some case (probaly empty
>> lines). And then entry.get is throwing the exception.
>>
>> You may want to filter the None values from accessLogDStream before you
>> run the map function over it.
>>
>> Hemant
>>
>> Hemant Bhanawat 
>> www.snappydata.io
>>
>> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:
>>
>>> Which line is line 42 in your code ?
>>>
>>> When variable lines becomes empty, you can stop your program.
>>>
>>> Cheers
>>>
>>> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>>>
>>> I am working on Spark Streaming API and I wish to stream a set of
>>> pre-downloaded web log files continuously to simulate a real-time stream. I
>>> wrote a script that gunzips the compressed logs and pipes the output to nc
>>> on port .
>>>
>>> The script looks like this:
>>>
>>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>>> zipped_files=`find $BASEDIR -name "*.gz"`
>>>
>>> for zfile in $zipped_files
>>>  do
>>>   echo "Unzipping $zfile..."
>>>   gunzip -c $zfile  | nc -l -p  -q 20
>>>
>>>  done
>>>
>>> I have streaming code written in Scala that processes the streams. It
>>> works well for the most part, but when its run out of files to stream I get
>>> the following error in Spark:
>>>
>>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>>> to only 0 peer(s) instead of 1 peers
>>> 
>>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>>> 47)
>>> java.util.NoSuchElementException: None.get
>>> at scala.None$.get(Option.scala:313)
>>> at scala.None$.get(Option.scala:311)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>>
>>> How to I implement a graceful shutdown so that the program exits
>>> gracefully when it no longer detects any data in the stream ?
>>>
>>> My Spark Streaming code looks like this:
>>>
>>> object StreamingLogEnhanced {
>>>  def main(args: Array[String]) {
>>>   val master = args(0)
>>>   val conf = new
>>>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>>  // Create a StreamingContext with a n second batch size
>>>   val ssc = new StreamingContext(conf, Seconds(10))
>>>  // Create a DStream from all the input on port 
>>>   val log = Logger.getLogger(getClass.getName)
>>>
>>>   sys.ShutdownHookThread {
>>>   log.info("Gracefully stopping Spark Streaming Application")
>>>   ssc.stop(true, true)
>>>   log.info("Application stopped")
>>>   }
>>>   val lines = ssc.socketTextStream("localhost", )
>>>   // Create a count of log hits by ip
>>>   var ipCounts=countByIp(lines)
>>>   ipCounts.print()
>>>
>>>   // start our streaming context and wait for it to "finish"
>>>   ssc.start()
>>>   // Wait for 600 seconds then exit
>>>   ssc.awaitTermination(1*600)
>>>   ssc.stop()
>>>   }
>>>
>>>  def countByIp(lines: DStream[String]) = {
>>>val parser = new AccessLogParser
>>>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>>val ipDStream = accessLogDStream.map(entry =>
>>> (entry.get.clientIpAddress, 1))
>>>ipDStream.reduceByKey((x, y) => x + y)
>>>  }
>>>
>>> }
>>>
>>> Thanks for any suggestions in advance.
>>>
>>>
>>
>


Serializing collections in Datasets

2016-02-22 Thread Daniel Siegmann
Experimenting with datasets in Spark 1.6.0 I ran into a serialization error
when using case classes containing a Seq member. There is no problem when
using Array instead. Nor is there a problem using RDD or DataFrame (even if
converting the DF to a DS later).

Here's an example you can test in the Spark shell:

import sqlContext.implicits._

case class SeqThing(id: String, stuff: Seq[Int])
val seqThings = Seq(SeqThing("A", Seq()))
val seqData = sc.parallelize(seqThings)

case class ArrayThing(id: String, stuff: Array[Int])
val arrayThings = Seq(ArrayThing("A", Array()))
val arrayData = sc.parallelize(arrayThings)


// Array works fine
arrayData.collect()
arrayData.toDF.as[ArrayThing]
arrayData.toDS

// Seq can't convert directly to DS
seqData.collect()
seqData.toDF.as[SeqThing]
seqData.toDS // Serialization exception

Is this working as intended? Are there plans to support serializing
arbitrary Seq values in datasets, or must everything be converted to Array?

~Daniel Siegmann


Re: Is this likely to cause any problems?

2016-02-19 Thread Daniel Siegmann
With EMR supporting Spark, I don't see much reason to use the spark-ec2
script unless it is important for you to be able to launch clusters using
the bleeding edge version of Spark. EMR does seem to do a pretty decent job
of keeping up to date - the latest version (4.3.0) supports the latest
Spark version (1.6.0).

So I'd flip the question around and ask: is there any reason to continue
using the spark-ec2 script rather than EMR?

On Thu, Feb 18, 2016 at 11:39 AM, James Hammerton  wrote:

> I have now... So far  I think the issues I've had are not related to this,
> but I wanted to be sure in case it should be something that needs to be
> patched. I've had some jobs run successfully but this warning appears in
> the logs.
>
> Regards,
>
> James
>
> On 18 February 2016 at 12:23, Ted Yu  wrote:
>
>> Have you seen this ?
>>
>> HADOOP-10988
>>
>> Cheers
>>
>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>
>>> HI,
>>>
>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>
>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>> disabled stack guard. The VM will try to fix the stack guard now.
>>> It's highly recommended that you fix the library with 'execstack -c 
>>> ', or link it with '-z noexecstack'.
>>>
>>>
>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>> is m4.large.
>>>
>>> Could this contribute to any problems running the jobs?
>>>
>>> Regards,
>>>
>>> James
>>>
>>
>>
>


Re: data type transform when creating an RDD object

2016-02-17 Thread Daniel Siegmann
This should do it (for the implementation of your parse method, Google
should easily provide information - SimpleDateFormatter is probably what
you want):

def parseDate(s: String): java.sql.Date = { ... }
val people = sc.textFile("examples/src/main/resources/people.txt")
   .map(_.split(","))
   .map(p => Person(p(0), *parseDate(*p(1*)*)))
   .toDF()


On Wed, Feb 17, 2016 at 11:47 AM, Lin, Hao  wrote:

> Hi,
>
>
>
> Quick question on data type transform when creating RDD object.
>
>
>
> I want to create a person object with “name” and DOB(date of birth):
>
> case class Person(name: String, DOB: java.sql.Date)
>
>
>
> then I want to create an RDD from a text file without the header, e.g.
> “name” and “DOB”. I have problem of the following expression, because p(1)
> is previously defined as java.sql.Date in the case class:
>
>
>
> val people =
> sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p
> => Person(p(0), p(1))).toDF()
>
>
>
> 36: error: type mismatch;
>
> found   : String
>
> required: java.sql.Date
>
>
>
> so how do I transform p(1) in the above expression to java.sql.Date.
>
>
>
> any help will be appreciated here.
>
>
>
> thanks
> Confidentiality Notice:: This email, including attachments, may include
> non-public, proprietary, confidential or legally privileged information. If
> you are not an intended recipient or an authorized agent of an intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of the information contained in or transmitted with this e-mail is
> unauthorized and strictly prohibited. If you have received this email in
> error, please notify the sender by replying to this message and permanently
> delete this e-mail, its attachments, and any copies of it immediately. You
> should not retain, copy or use this e-mail or any attachment for any
> purpose, nor disclose all or any part of the contents to any other person.
> Thank you.
>


Re: Spark 2.0.0 release plan

2016-01-27 Thread Daniel Siegmann
Will there continue to be monthly releases on the 1.6.x branch during the
additional time for bug fixes and such?

On Tue, Jan 26, 2016 at 11:28 PM, Koert Kuipers  wrote:

> thanks thats all i needed
>
> On Tue, Jan 26, 2016 at 6:19 PM, Sean Owen  wrote:
>
>> I think it will come significantly later -- or else we'd be at code
>> freeze for 2.x in a few days. I haven't heard anyone discuss this
>> officially but had batted around May or so instead informally in
>> conversation. Does anyone have a particularly strong opinion on that?
>> That's basically an extra 3 month period.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
>>
>> On Tue, Jan 26, 2016 at 10:00 PM, Koert Kuipers 
>> wrote:
>> > Is the idea that spark 2.0 comes out roughly 3 months after 1.6? So
>> > quarterly release as usual?
>> > Thanks
>>
>
>


Re: Too many tasks killed the scheduler

2016-01-12 Thread Daniel Siegmann
As I understand it, your initial number of partitions will always depend on
the initial data. I'm not aware of any way to change this, other than
changing the configuration of the underlying data store.

Have you tried reading the data in several data frames (e.g. one data frame
per day), coalescing each data frame, and *then* unioning them? You could
try with and without a shuffle. Not sure if it'll work, but might be worth
a shot.

On Mon, Jan 11, 2016 at 8:39 PM, Gavin Yue  wrote:

> Thank you for the suggestion.
>
> I tried the df.coalesce(1000).write.parquet() and yes, the parquet file
> number drops to 1000, but the parition of parquet stills is like 5000+.
> When I read the parquet and do a count, it still has the 5000+ tasks.
>
> So I guess I need to do a repartition here to drop task number?  But
> repartition never works for me, always failed due to out of memory.
>
> And regarding the large number task delay problem, I found a similar
> problem: https://issues.apache.org/jira/browse/SPARK-7447.
>
> I am unionALL like 10 parquet folder, with totally 70K+ parquet files,
> generating 70k+ taskes. It took around 5-8 mins before all tasks start just
> like the ticket abover.
>
> It also happens if I do a partition discovery with base path.Is there
> any schema inference or checking doing, which causes the slowness?
>
> Thanks,
> Gavin
>
>
>
> On Mon, Jan 11, 2016 at 1:21 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you use "coalesce" to reduce the number of partitions?
>>
>>
>> Shixiong Zhu
>>
>>
>> On Mon, Jan 11, 2016 at 12:21 AM, Gavin Yue 
>> wrote:
>>
>>> Here is more info.
>>>
>>> The job stuck at:
>>> INFO cluster.YarnScheduler: Adding task set 1.0 with 79212 tasks
>>>
>>> Then got the error:
>>> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
>>> after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
>>>
>>> So I increased spark.network.timeout from 120s to 600s.  It sometimes
>>> works.
>>>
>>> Each task is a parquet file.  I could not repartition due to out of GC
>>> problem.
>>>
>>> Is there any way I could to improve the performance?
>>>
>>> Thanks,
>>> Gavin
>>>
>>>
>>> On Sun, Jan 10, 2016 at 1:51 AM, Gavin Yue 
>>> wrote:
>>>
 Hey,

 I have 10 days data, each day has a parquet directory with over 7000
 partitions.
 So when I union 10 days and do a count, then it submits over 70K tasks.

 Then the job failed silently with one container exit with code 1.  The
 union with like 5, 6 days data is fine.
 In the spark-shell, it just hang showing: Yarn scheduler submit 7+
 tasks.

 I am running spark 1.6 over hadoop 2.7.  Is there any setting I could
 change to make this work?

 Thanks,
 Gavin



>>>
>>
>


Zip data frames

2015-12-29 Thread Daniel Siegmann
RDD has methods to zip with another RDD or with an index, but there's no
equivalent for data frames. Anyone know a good way to do this?

I thought I could just convert to RDD, do the zip, and then convert back,
but ...

   1. I don't see a way (outside developer API) to convert RDD[Row]
   directly back to DataFrame. Is there really no way to do this?
   2. I don't see any way to modify Row objects or create new rows with
   additional columns. In other words, no way to convert RDD[(Row, Row)] to
   RDD[Row]

It seems the only way to get what I want is to extract out the data into a
case class and then convert back to a data frame. Did I miss something?


Re: DataFrame Vs RDDs ... Which one to use When ?

2015-12-28 Thread Daniel Siegmann
DataFrames are a higher level API for working with tabular data - RDDs are
used underneath. You can use either and easily convert between them in your
code as necessary.

DataFrames provide a nice abstraction for many cases, so it may be easier
to code against them. Though if you're used to thinking in terms of
collections rather than tables, you may find RDDs more natural. Data frames
can also be faster, since Spark will do some optimizations under the hood -
if you are using PySpark, this will avoid the overhead. Data frames may
also perform better if you're reading structured data, such as a Hive table
or Parquet files.

I recommend you prefer data frames, switching over to RDDs as necessary
(when you need to perform an operation not supported by data frames / Spark
SQL).

HOWEVER (and this is a big one), Spark 1.6 will have yet another API -
datasets. The release of Spark 1.6 is currently being finalized and I would
expect it in the next few days. You will probably want to use the new API
once it's available.


On Sun, Dec 27, 2015 at 9:18 PM, Divya Gehlot 
wrote:

> Hi,
> I am new bee to spark and a bit confused about RDDs and DataFames in Spark.
> Can somebody explain me with the use cases which one to use when ?
>
> Would really appreciate the clarification .
>
> Thanks,
> Divya
>


Re: is repartition very cost

2015-12-09 Thread Daniel Siegmann
Each node can have any number of partitions. Spark will try to have a node
process partitions which are already on the node for best performance (if
you look at the list of tasks in the UI, look under the locality level
column).

As a rule of thumb, you probably want 2-3 times the number of partitions as
you have executors. This helps distribute the work evenly. You would need
to experiment to find the best number for your own case.

If you're reading from a distributed data store (such as HDFS), you should
expect the data to already be partitioned. Any time a shuffle is performed
the data will be repartitioned into a number of partitions equal to the
spark.default.parallelism setting (see
http://spark.apache.org/docs/latest/configuration.html), but most
operations which cause a shuffle also take an optional parameter to set a
different value. If using data frames, use spark.sql.shuffle.partitions.

I recommend you do not do any explicit partitioning or mess with these
values until you find a need for it. If executors are sitting idle, that's
a sign you may need to repartition.


On Tue, Dec 8, 2015 at 9:35 PM, Zhiliang Zhu 
wrote:

> Thanks very much for Yong's help.
>
> Sorry that for one more issue, is it that different partitions must be in
> different nodes? that is, each node would only have one partition, in
> cluster mode ...
>
>
>
> On Wednesday, December 9, 2015 6:41 AM, "Young, Matthew T" <
> matthew.t.yo...@intel.com> wrote:
>
>
> Shuffling large amounts of data over the network is expensive, yes. The
> cost is lower if you are just using a single node where no networking needs
> to be involved to do the repartition (using Spark as a multithreading
> engine).
>
> In general you need to do performance testing to see if a repartition is
> worth the shuffle time.
>
> A common model is to repartition the data once after ingest to achieve
> parallelism and avoid shuffles whenever possible later.
>
> *From:* Zhiliang Zhu [mailto:zchl.j...@yahoo.com.INVALID]
> *Sent:* Tuesday, December 08, 2015 5:05 AM
> *To:* User 
> *Subject:* is repartition very cost
>
>
> Hi All,
>
> I need to do optimize objective function with some linear constraints by
>  genetic algorithm.
> I would like to make as much parallelism for it by spark.
>
> repartition / shuffle may be used sometimes in it, however, is repartition
> API very cost ?
>
> Thanks in advance!
> Zhiliang
>
>
>
>
>


Re: Unit tests of spark application

2015-07-10 Thread Daniel Siegmann
On Fri, Jul 10, 2015 at 1:41 PM, Naveen Madhire 
wrote:

> I want to write junit test cases in scala for testing spark application.
> Is there any guide or link which I can refer.
>

https://spark.apache.org/docs/latest/programming-guide.html#unit-testing

Typically I create test data using SparkContext.parallelize and then call
RDD.collect to get the results to assert.


Re: Getting started with spark-scala developemnt in eclipse.

2015-07-08 Thread Daniel Siegmann
To set up Eclipse for Spark you should install the Scala IDE plugins:
http://scala-ide.org/download/current.html

Define your project in Maven with Scala plugins configured (you should be
able to find documentation online) and import as an existing Maven project.
The source code should be in src/main/scala but otherwise the project
structure will be the same as you'd expect in Java.

Nothing special is needed for Spark. Just define the desired Spark jars (
spark-core and possibly others, such as spark-sql) in your Maven POM as
dependencies. You should scope these dependencies as provided, since they
will automatically be on the classpath when you deploy your project to a
Spark cluster.

One thing to keep in mind is that Scala dependencies require separate jars
for different versions of Scala, and it is convention to append the Scala
version to the artifact ID. For example, if you are using Scala 2.11.x,
your dependency will be spark-core_2.11 (look on search.maven.org if you're
not sure). I think you can omit the Scala version if you're using SBT (not
sure why you would, but some people seem to prefer it).

Unit testing Spark is briefly explained in the programming guide
.

To deploy using spark-submit you can build the jar using mvn package if and
only if you don't have any non-Spark dependencies. Otherwise, the simplest
thing is to build a jar with dependencies (typically using the assembly
 or
shade  plugins).





On Wed, Jul 8, 2015 at 9:38 AM, Prateek .  wrote:

>  Hi
>
>
>
> I am beginner to scala and spark. I am trying to set up eclipse
> environment to develop spark program  in scala, then take it’s  jar  for
> spark-submit.
>
> How shall I start? To start my  task includes, setting up eclipse for
> scala and spark, getting dependencies resolved, building project using
> maven/sbt.
>
> Is there any good blog or documentation that is can follow.
>
>
>
> Thanks
>
>  "DISCLAIMER: This message is proprietary to Aricent and is intended
> solely for the use of the individual to whom it is addressed. It may
> contain privileged or confidential information and should not be circulated
> or used for any purpose other than for what it is intended. If you have
> received this message in error, please notify the originator immediately.
> If you are not the intended recipient, you are notified that you are
> strictly prohibited from using, copying, altering, or disclosing the
> contents of this message. Aricent accepts no responsibility for loss or
> damage arising from the use of the information transmitted by this email
> including damage from virus."
>


Re: Want to avoid groupByKey as its running for ever

2015-06-30 Thread Daniel Siegmann
If the number of items is very large, have you considered using
probabilistic counting? The HyperLogLogPlus

class from stream-lib  might be
suitable.

On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I have a RDD of type (String,
>  
> Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
> com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]
>
> Here String is Key and a list of tuples for that key. I got above RDD
> after doing a groupByKey. I later want to compute total number of values
> for a given key and total number of unique values for the same given key
> and hence i do this
>
> val totalViCount = details.size.toLong
> val uniqueViCount =
> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>
> How do i do this using reduceByKey.
>
> *Total Code:*
>
>   val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
> DataRecord)])] = detailInputsToGroup.map {
> case (detailInput, dataRecord) =>
>   val key: StringBuilder = new StringBuilder
>   dimensions.foreach {
> dimension =>
>   key ++= {
>
> Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
>   }
>   }
>   (key.toString, (detailInput, dataRecord))
>   }.groupByKey
>
>   groupedDetail.map {
> case (key, values) => {
>   val valueList = values.toList
>
>   //Compute dimensions // You can skup this
>   val (detailInput, dataRecord) = valueList.head
>   val schema = SchemaUtil.outputSchema(_detail)
>   val detailOutput = new DetailOutputRecord(detail, new
> SessionRecord(schema))
>   DataUtil.populateDimensions(schema, dimensions.toArray,
> detailInput, dataRecord, detailOutput)
>
>
>   val metricsData = metricProviders.flatMap {
> case (className, instance) =>
>   val data = instance.getMetrics(valueList)
>   ReflectionUtil.getData(data,
> _metricProviderMemberNames(className))
>   }
>   metricsData.map { case (k, v) => detailOutput.put(k, v) }
>   val wrap = new AvroKey[DetailOutputRecord](detailOutput)
>   (wrap, NullWritable.get)
> }
>   }
>
>
> //getMetrics:
>   def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
> val totalViCount = details.size.toLong
> val uniqueViCount =
> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
> new ViewItemCountMetric(totalViCount, uniqueViCount)
>   }
>
>
> I understand that totalViCount can be implemented using reduceByKey. How
> can i implement total unique count as i need to have the full list to know
> the unique values.
>
> --
> Deepak
>
>


Re: Unit testing with HiveContext

2015-04-09 Thread Daniel Siegmann
Thanks Ted, using HiveTest as my context worked. It still left a metastore
directory and Derby log in my current working directory though; I manually
added a shutdown hook to delete them and all was well.

On Wed, Apr 8, 2015 at 4:33 PM, Ted Yu  wrote:

> Please take a look at
> sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala :
>
>   protected def configure(): Unit = {
> warehousePath.delete()
> metastorePath.delete()
> setConf("javax.jdo.option.ConnectionURL",
>   s"jdbc:derby:;databaseName=$metastorePath;create=true")
> setConf("hive.metastore.warehouse.dir", warehousePath.toString)
>   }
>
> Cheers
>
> On Wed, Apr 8, 2015 at 1:07 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> I am trying to unit test some code which takes an existing HiveContext
>> and uses it to execute a CREATE TABLE query (among other things).
>> Unfortunately I've run into some hurdles trying to unit test this, and I'm
>> wondering if anyone has a good approach.
>>
>> The metastore DB is automatically created in the local directory, but it
>> doesn't seem to be cleaned up afterward. Is there any way to get Spark to
>> clean this up when the context is stopped? Or can I point this to some
>> other location, such as a temp directory?
>>
>> Trying to create a table fails because it is using the default warehouse
>> directory (/user/hive/warehouse). Is there some way to change this without
>> hard-coding a directory in a hive-site.xml; again, I'd prefer to point it
>> to a temp directory so it will be automatically removed. I tried a couple
>> of things that didn't work:
>>
>>- hiveContext.sql("SET hive.metastore.warehouse.dir=/tmp/dir/xyz")
>>- hiveContext.setConf("hive.metastore.warehouse.dir", "/tmp/dir/xyz")
>>
>> Any advice from those who have been here before would be appreciated.
>>
>
>


Unit testing with HiveContext

2015-04-08 Thread Daniel Siegmann
I am trying to unit test some code which takes an existing HiveContext and
uses it to execute a CREATE TABLE query (among other things). Unfortunately
I've run into some hurdles trying to unit test this, and I'm wondering if
anyone has a good approach.

The metastore DB is automatically created in the local directory, but it
doesn't seem to be cleaned up afterward. Is there any way to get Spark to
clean this up when the context is stopped? Or can I point this to some
other location, such as a temp directory?

Trying to create a table fails because it is using the default warehouse
directory (/user/hive/warehouse). Is there some way to change this without
hard-coding a directory in a hive-site.xml; again, I'd prefer to point it
to a temp directory so it will be automatically removed. I tried a couple
of things that didn't work:

   - hiveContext.sql("SET hive.metastore.warehouse.dir=/tmp/dir/xyz")
   - hiveContext.setConf("hive.metastore.warehouse.dir", "/tmp/dir/xyz")

Any advice from those who have been here before would be appreciated.


Re: Setup Spark jobserver for Spark SQL

2015-04-02 Thread Daniel Siegmann
You shouldn't need to do anything special. Are you using a named context?
I'm not sure those work with SparkSqlJob.

By the way, there is a forum on Google groups for the Spark Job Server:
https://groups.google.com/forum/#!forum/spark-jobserver

On Thu, Apr 2, 2015 at 5:10 AM, Harika  wrote:

> Hi,
>
> I am trying to Spark Jobserver(
> https://github.com/spark-jobserver/spark-jobserver
>   ) for running Spark
> SQL jobs.
>
> I was able to start the server but when I run my application(my Scala class
> which extends SparkSqlJob), I am getting the following as response:
>
> {
>   "status": "ERROR",
>   "result": "Invalid job type for this context"
> }
>
> Can any one suggest me what is going wrong or provide a detailed procedure
> for setting up jobserver for SparkSQL?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Setup-Spark-jobserver-for-Spark-SQL-tp22352.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Partitioning Dataset and Using Reduce in Apache Spark

2015-03-13 Thread Daniel Siegmann
On Thu, Mar 12, 2015 at 1:45 AM,  wrote:

>
> In your response you say “When you call reduce and *similar *methods,
> each partition can be reduced in parallel. Then the results of that can be
> transferred across the network and reduced to the final result”. By similar
> methods do you mean all actions within spark? Does transfer of data from
> worker nodes to driver nodes happen only when an action is performed?
>

There is reduceByKey and some others that are in a more generalized form.
Other transformations and actions may work somewhat differently, but they
will generally be parallelized as much as is possible.

If you look at the UI when the job is running, you will see some number of
tasks. Each task corresponds to a single partition.

Not all actions cause data to be transferred from worker nodes to the
driver (there is only one node) - saveAsTextFile, for example. In any case,
no processing is done and no data is transferred anywhere until an action
is invoked, since transformations are lazy.


> I am assuming that in Spark, you typically have a set of transformations
> followed by some sort of action. The RDD is partitioned and sent to
> different worker nodes(assuming this a cluster setup), the transformations
> are applied to the RDD partitions at the various worker nodes, and then
> when an action is performed, you perform the action on the worker nodes and
> then aggregate the partial results at the driver and then perform another
> reduction at the driver to obtain the overall results. I would also assume
> that deciding whether the action should be done on a worker node, depends
> on the type of action. For example, performing reduce at the worker node
> makes sense, while it doesn't make sense to save the file at the worker
> node.  Does that sound correct, or am I misinterpreting something?
>

On the contrary, saving of files would typically be done at the worker
node. If you are handling anything approaching "Big Data" you will be
keeping it in a distributed store (typically HDFS, though it depends on
your use case), and each worker will write into this store. For example, if
you saveAsTextFile you will see multiple "part-*" files in the output
directory from separate partitions (don't worry about combining them for
future processing, sc.textFile can read them all in as a single RDD with
the data appropriately partitioned).

Some actions do pull data back to the driver - collect, for example. You
need to be careful when using such methods that you can be sure the amount
of data won't be too large for your driver.

In general you should avoid pulling any data back to the driver or doing
any processing on the driver as that will not scale. In some cases it may
be useful; for example if you wanted to join a large data set with a small
data set, it might perform better to collect the small data set and
broadcast it.

Take this for example:

sc.textFile(inputPath).map(...).filter(...).saveAsTextFile(outputPath)

None of that will execute on the driver. The map and filter will be
collapsed into a single stage (which you will in the UI). Each worker will
prefer to read its local data (but data may be transferred if there's none
locally to work on), transform the data, and write it out.

If you had this for example:

sc.textFile(inputPath).map(...).reduceByKey(...).saveAsTextFile(outputPath)


You will have two stages, since reduceByKey causes a shuffle - data will be
transferred across the network and formed into a new set of partitions. But
after the reduce, each worker will still save the data it is responsible
for. (You can provide a custom partitioner, but don't do that unless you
feel you have a good reason to do so.)

Hope that helps.


Re: Which is more efficient : first join three RDDs and then do filtering or vice versa?

2015-03-12 Thread Daniel Siegmann
Join causes a shuffle (sending data across the network). I expect it will
be better to filter before you join, so you reduce the amount of data which
is sent across the network.

Note this would be true for *any* transformation which causes a shuffle. It
would not be true if you're combining RDDs with union, since that doesn't
cause a shuffle.

On Thu, Mar 12, 2015 at 11:04 AM, shahab  wrote:

> Hi,
>
> Probably this question is already answered sometime in the mailing list,
> but i couldn't find it. Sorry for posting this again.
>
> I need to to join and apply filtering on three different RDDs, I just
> wonder which of the following alternatives are more efficient:
> 1- first joint all three RDDs and then do  filtering on resulting joint
> RDD   or
> 2- Apply filtering on each individual RDD and then join the resulting RDDs
>
>
> Or probably there is no difference due to lazy evaluation and under
> beneath Spark optimisation?
>
> best,
> /Shahab
>


Re: Partitioning Dataset and Using Reduce in Apache Spark

2015-03-05 Thread Daniel Siegmann
An RDD is a Resilient *Distributed* Data set. The partitioning and
distribution of the data happens in the background. You'll occasionally
need to concern yourself with it (especially to get good performance), but
from an API perspective it's mostly invisible (some methods do allow you to
specify a number of partitions).

When you call sc.textFile(myPath) or similar, you get an RDD. That RDD will
be composed of a bunch of partitions, but you don't really need to worry
about that. The partitioning will be based on how the data is stored. When
you call a method that causes a shuffle (such as reduce), the data is
repartitioned into a number of partitions based on your default parallelism
setting (which IIRC is based on your number of cores if you haven't set it
explicitly).

When you call reduce and similar methods, each partition can be reduced in
parallel. Then the results of that can be transferred across the network
and reduced to the final result. *You supply the function and Spark handles
the parallel execution of that function*.

I hope this helps clear up your misconceptions. You might also want to
familiarize yourself with the collections API in Java 8 (or Scala, or
Python, or pretty much any other language with lambda expressions), since
RDDs are meant to have an API that feels similar.

On Thu, Mar 5, 2015 at 9:45 AM, raggy  wrote:

> I am trying to use Apache spark to load up a file, and distribute the file
> to
> several nodes in my cluster and then aggregate the results and obtain them.
> I don't quite understand how to do this.
>
> From my understanding the reduce action enables Spark to combine the
> results
> from different nodes and aggregate them together. Am I understanding this
> correctly?
>
> From a programming perspective, I don't understand how I would code this
> reduce function.
>
> How exactly do I partition the main dataset into N pieces and ask them to
> be
> parallel processed by using a list of transformations?
>
> reduce is supposed to take in two elements and a function for combining
> them. Are these 2 elements supposed to be RDDs from the context of Spark or
> can they be any type of element? Also, if you have N different partitions
> running parallel, how would reduce aggregate all their results into one
> final result(since the reduce function aggregates only 2 elements)?
>
> Also, I don't understand this example. The example from the spark website
> uses reduce, but I don't see the data being processed in parallel. So, what
> is the point of the reduce? If I could get a detailed explanation of the
> loop in this example, I think that would clear up most of my questions.
>
> class ComputeGradient extends Function {
>   private Vector w;
>   ComputeGradient(Vector w) { this.w = w; }
>   public Vector call(DataPoint p) {
> return p.x.times(p.y * (1 / (1 + Math.exp(w.dot(p.x))) - 1));
>   }
> }
>
> JavaRDD points = spark.textFile(...).map(new
> ParsePoint()).cache();
> Vector w = Vector.random(D); // current separating plane
> for (int i = 0; i < ITERATIONS; i++) {
>   Vector gradient = points.map(new ComputeGradient(w)).reduce(new
> AddVectors());
>   w = w.subtract(gradient);
> }
> System.out.println("Final separating plane: " + w);
>
> Also, I have been trying to find the source code for reduce from the Apache
> Spark Github, but the source is pretty huge and I haven't been able to
> pinpoint it. Could someone please direct me towards which file I could find
> it in?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitioning-Dataset-and-Using-Reduce-in-Apache-Spark-tp21933.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SparkSQL production readiness

2015-03-02 Thread Daniel Siegmann
OK, good to know data frames are still experimental. Thanks Michael.

On Mon, Mar 2, 2015 at 12:37 PM, Michael Armbrust 
wrote:

> We have been using Spark SQL in production for our customers at Databricks
> for almost a year now.  We also know of some very large production
> deployments elsewhere.  It is still a young project, but I wouldn't call it
> alpha.
>
> The primary changes to the API are the addition of the DataFrame
> interface, which is an expansion of the DSL that was already there.  All of
> the SQL / HiveQL stuff remains unchanged, as well as the internal execution
> engine.  DataFrames are still marked experimental, since as you said, we
> should let people use them before cementing them.
>


Re: SparkSQL production readiness

2015-03-02 Thread Daniel Siegmann
I thought removing the alpha tag just meant the API was stable? Speaking of
which, aren't there major changes to the API coming in 1.3? Why are you
marking the API as stable before these changes have been widely used?


On Sat, Feb 28, 2015 at 5:17 PM, Michael Armbrust 
wrote:

> We are planning to remove the alpha tag in 1.3.0.
>
> On Sat, Feb 28, 2015 at 12:30 AM, Wang, Daoyuan 
> wrote:
>
>>  Hopefully  the alpha tag will be remove in 1.4.0, if the community can
>> review code a little bit faster :P
>>
>>
>>
>> Thanks,
>>
>> Daoyuan
>>
>>
>>
>> *From:* Ashish Mukherjee [mailto:ashish.mukher...@gmail.com]
>> *Sent:* Saturday, February 28, 2015 4:28 PM
>> *To:* user@spark.apache.org
>> *Subject:* SparkSQL production readiness
>>
>>
>>
>> Hi,
>>
>>
>>
>> I am exploring SparkSQL for my purposes of performing large relational
>> operations across a cluster. However, it seems to be in alpha right now. Is
>> there any indication when it would be considered production-level? I don't
>> see any info on the site.
>>
>>
>>
>> Regards,
>>
>> Ashish
>>
>
>


Re: Filtering keys after map+combine

2015-02-19 Thread Daniel Siegmann
I'm not sure what your use case is, but perhaps you could use mapPartitions
to reduce across the individual partitions and apply your filtering. Then
you can finish with a reduceByKey.

On Thu, Feb 19, 2015 at 9:21 AM, Debasish Das 
wrote:

> Hi,
>
> Before I send out the keys for network shuffle, in reduceByKey after map +
> combine are done, I would like to filter the keys based on some threshold...
>
> Is there a way to get the key, value after map+combine stages so that I
> can run a filter on the keys ?
>
> Thanks.
> Deb
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Escape commas in file names

2014-12-26 Thread Daniel Siegmann
Thanks for the replies. Hopefully this will not be too difficult to fix.

Why not support multiple paths by overloading the parquetFile method to
take a collection of strings? That way we don't need an appropriate
delimiter.

On Thu, Dec 25, 2014 at 3:46 AM, Cheng, Hao  wrote:

>  I’ve created a jira issue for this
> https://issues.apache.org/jira/browse/SPARK-4967
>
>
>
> Originally we want to support multiple parquet file paths scanning as I
> guess, and those file paths are in a single string separated by comma
> internally, however I didn’t find any public example says we support
> multiple parquet files for API sqlContext.parquetFile, we need to think how
> to support multiple paths in some other way.
>
>
>
> Cheng Hao
>


Escape commas in file names

2014-12-23 Thread Daniel Siegmann
I am trying to load a Parquet file which has a comma in its name. Yes, this
is a valid file name in HDFS. However, sqlContext.parquetFile interprets
this as a comma-separated list of parquet files.

Is there any way to escape the comma so it is treated as part of a single
file name?

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Siegmann
Harihar, your question is the opposite of what was asked. In the future,
please start a new thread for new questions.

You want to do a join in your case. The join function does an inner join,
which I think is what you want because you stated your IDs are common in
both RDDs. For other cases you can look at leftOuterJoin, rightOuterJoin,
and cogroup (alias groupWith). These are all on PairRDDFunctions (in Scala)
[1]

Alternatively, if one of your RDDs is small, you could collect it and
broadcast the collection, using it in functions on the other RDD. But I
don't think this will apply in your case because the number of records will
be equal in each RDD. [2]

[1]
http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs
[2]
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

On Thu, Nov 20, 2014 at 4:53 PM, Harihar Nahak 
wrote:

> I've similar type of issue, want to join two different type of RDD in one
> RDD
>
> file1.txt content (ID, counts)
> val x : RDD[Long, Int] = sc.textFile("file1.txt").map( line =>
> line.split(",")).map(row => (row(0).toLong, row(1).toInt)
> [(4407 ,40),
> (2064, 38),
> (7815 ,10),
> (5736,17),
> (8031,3)]
>
> Second RDD from : file2.txt contains (ID, name)
> val y: RDD[(Long, String)]{where ID is common in both the RDDs}
> [(4407 ,Jhon),
> (2064, Maria),
> (7815 ,Casto),
> (5736,Ram),
> (8031,XYZ)]
>
> and I'm expecting result should be like this : [(ID, Name, Count)]
> [(4407 ,Jhon, 40),
> (2064, Maria, 38),
> (7815 ,Casto, 10),
> (5736,Ram, 17),
> (8031,XYZ, 3)]
>
>
> Any help will really appreciate. Thanks
>
>
>
>
> On 21 November 2014 09:18, dsiegmann [via Apache Spark User List] <[hidden
> email] <http://user/SendEmail.jtp?type=node&node=19423&i=0>> wrote:
>
>> You want to use RDD.union (or SparkContext.union for many RDDs). These
>> don't join on a key. Union doesn't really do anything itself, so it is low
>> overhead. Note that the combined RDD will have all the partitions of the
>> original RDDs, so you may want to coalesce after the union.
>>
>> val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
>> val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
>> val z = x.union(y)
>>
>> z.collect
>> res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))
>>
>>
>> On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith <[hidden email]
>> <http://user/SendEmail.jtp?type=node&node=19419&i=0>> wrote:
>>
>>> Say I have two RDDs with the following values
>>>
>>> x = [(1, 3), (2, 4)]
>>>
>>> and
>>>
>>> y = [(3, 5), (4, 7)]
>>>
>>> and I want to have
>>>
>>> z = [(1, 3), (2, 4), (3, 5), (4, 7)]
>>>
>>> How can I achieve this. I know you can use outerJoin followed by map to
>>> achieve this, but is there a more direct way for this.
>>>
>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 54 W 40th St, New York, NY 10018
>> E: [hidden email] <http://user/SendEmail.jtp?type=node&node=19419&i=1>
>> W: www.velos.io
>>
>>
>> --
>>  If you reply to this email, your message will be added to the
>> discussion below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19419.html
>>  To start a new topic under Apache Spark User List, email [hidden email]
>> <http://user/SendEmail.jtp?type=node&node=19423&i=1>
>> To unsubscribe from Apache Spark User List, click here.
>> NAML
>> <http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
>
> --
> Regards,
> Harihar Nahak
> BigData Developer
> Wynyard
> [hidden email] <http://user/SendEmail.jtp?type=node&node=19423&i=2> |
> Extn: 8019
>  --Harihar
>
> --
> View this message in context: Re: How to join two RDDs with mutually
> exclusive keys
> <http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-two-RDDs-with-mutually-exclusive-keys-tp19417p19423.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to join two RDDs with mutually exclusive keys

2014-11-20 Thread Daniel Siegmann
You want to use RDD.union (or SparkContext.union for many RDDs). These
don't join on a key. Union doesn't really do anything itself, so it is low
overhead. Note that the combined RDD will have all the partitions of the
original RDDs, so you may want to coalesce after the union.

val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
val z = x.union(y)

z.collect
res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))


On Thu, Nov 20, 2014 at 3:06 PM, Blind Faith 
wrote:

> Say I have two RDDs with the following values
>
> x = [(1, 3), (2, 4)]
>
> and
>
> y = [(3, 5), (4, 7)]
>
> and I want to have
>
> z = [(1, 3), (2, 4), (3, 5), (4, 7)]
>
> How can I achieve this. I know you can use outerJoin followed by map to
> achieve this, but is there a more direct way for this.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: PairRDDFunctions with Tuple2 subclasses

2014-11-19 Thread Daniel Siegmann
Casting to Tuple2 is easy, but the output of reduceByKey is presumably a
new Tuple2 instance so I'll need to map those to new instances of my class.
Not sure how much overhead will be added by the creation of those new
instances.

If I do that everywhere in my code though, it will make the code really
messy. That is why I was thinking of creating a wrapper which looks like
PairRDDFunctions which would cast to a pair RDD, delegate to
PairRDDFunctions, and then convert back to my class.

I was kinda hoping a Scala wizard would come along with some black magic
though.

On Wed, Nov 19, 2014 at 7:45 PM, Michael Armbrust 
wrote:

> I think you should also be able to get away with casting it back and forth
> in this case using .asInstanceOf.
>
> On Wed, Nov 19, 2014 at 4:39 PM, Daniel Siegmann  > wrote:
>
>> I have a class which is a subclass of Tuple2, and I want to use it with
>> PairRDDFunctions. However, I seem to be limited by the invariance of T
>> in RDD[T] (see SPARK-1296
>> <https://issues.apache.org/jira/browse/SPARK-1296>).
>>
>> My Scala-fu is weak: the only way I could think to make this work would
>> be to define my own equivalent of PairRDDFunctions which works with my
>> class, does type conversions to Tuple2, and delegates to PairRDDFunctions
>> .
>>
>> Does anyone know a better way? Anyone know if there will be a significant
>> performance penalty with that approach?
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 54 W 40th St, New York, NY 10018
>> E: daniel.siegm...@velos.io W: www.velos.io
>>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


PairRDDFunctions with Tuple2 subclasses

2014-11-19 Thread Daniel Siegmann
I have a class which is a subclass of Tuple2, and I want to use it with
PairRDDFunctions. However, I seem to be limited by the invariance of T in
RDD[T] (see SPARK-1296 <https://issues.apache.org/jira/browse/SPARK-1296>).

My Scala-fu is weak: the only way I could think to make this work would be
to define my own equivalent of PairRDDFunctions which works with my class,
does type conversions to Tuple2, and delegates to PairRDDFunctions.

Does anyone know a better way? Anyone know if there will be a significant
performance penalty with that approach?

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to assign consecutive numeric id to each row based on its content?

2014-11-18 Thread Daniel Siegmann
I think zipWithIndex is zero-based, so if you want 1 to N, you'll need to
increment them like so:

val r2 = r1.keys.distinct().zipWithIndex().mapValues(_ + 1)

If the number of distinct keys is relatively small, you might consider
collecting them into a map and broadcasting them rather than using a join,
like so:

val keyIndices = sc.broadcast(r2.collect.toMap)
val r3 = r1.map { case (k, v) => (keyIndices(k), v) }

On Tue, Nov 18, 2014 at 8:16 AM, Cheng Lian  wrote:

>  A not so efficient way can be this:
>
> val r0: RDD[OriginalRow] = ...val r1 = r0.keyBy(row => 
> extractKeyFromOriginalRow(row))val r2 = r1.keys.distinct().zipWithIndex()val 
> r3 = r2.join(r1).values
>
> On 11/18/14 8:54 PM, shahab wrote:
>
>   Hi,
>
>  In my spark application, I am loading some rows from database into Spark
> RDDs
> Each row has several fields, and a string key. Due to my requirements I
> need to work with consecutive numeric ids (starting from 1 to N, where N is
> the number of unique keys) instead of string keys . Also several rows can
> have same string key .
>
>  In spark context, how I can map each row into (Numeric_Key, OriginalRow)
> as map/reduce  tasks such that rows with same original string key get same
> numeric consecutive key?
>
>  Any hints?
>
>  best,
> /Shahab
>
>   ​
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-17 Thread Daniel Siegmann
I'm not aware of any such mechanism.

On Mon, Nov 17, 2014 at 2:55 PM, Pala M Muthaia  wrote:

> Hi Daniel,
>
> Yes that should work also. However, is it possible to setup so that each
> RDD has exactly one partition, without repartitioning (and thus incurring
> extra cost)? Is there a mechanism similar to MR where we can ensure each
> partition is assigned some amount of data by size, by setting some block
> size parameter?
>
>
>
> On Thu, Nov 13, 2014 at 1:05 PM, Daniel Siegmann  > wrote:
>
>> On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia <
>> mchett...@rocketfuelinc.com> wrote
>>
>>>
>>> No i don't want separate RDD because each of these partitions are being
>>> processed the same way (in my case, each partition corresponds to HBase
>>> keys belonging to one region server, and i will do HBase lookups). After
>>> that i have aggregations too, hence all these partitions should be in the
>>> same RDD. The reason to follow the partition structure is to limit
>>> concurrent HBase lookups targeting a single region server.
>>>
>>
>> Neither of these is necessarily a barrier to using separate RDDs. You can
>> define the function you want to use and then pass it to multiple map
>> methods. Then you could union all the RDDs to do your aggregations. For
>> example, it might look something like this:
>>
>> val paths: String = ... // the paths to the files you want to load
>> def myFunc(t: T) = ... // the function to apply to every RDD
>> val rdds = paths.map { path =>
>> sc.textFile(path).map(myFunc)
>> }
>> val completeRdd = sc.union(rdds)
>>
>> Does that make any sense?
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 54 W 40th St, New York, NY 10018
>> E: daniel.siegm...@velos.io W: www.velos.io
>>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: RDD.aggregate versus accumulables...

2014-11-17 Thread Daniel Siegmann
You should *never* use accumulators for this purpose because you may get
incorrect answers. Accumulators can count the same thing multiple times -
you cannot rely upon the correctness of the values they compute. See
SPARK-732 <https://issues.apache.org/jira/browse/SPARK-732> for more info.

On Sun, Nov 16, 2014 at 10:06 PM, Segerlind, Nathan L <
nathan.l.segerl...@intel.com> wrote:

>  Hi All.
>
>
>
> I am trying to get my head around why using accumulators and accumulables
> seems to be the most recommended method for accumulating running sums,
> averages, variances and the like, whereas the aggregate method seems to me
> to be the right one. I have no performance measurements as of yet, but it
> seems that aggregate is simpler and more intuitive (And it does what one
> might expect an accumulator to do) whereas the accumulators and
> accumulables seem to have some extra complications and overhead.
>
>
>
> So…
>
>
>
> What’s the real difference between an accumulator/accumulable and
> aggregating an RDD? When is one method of aggregation preferred over the
> other?
>
>
>
> Thanks,
>
> Nate
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How do you force a Spark Application to run in multiple tasks

2014-11-17 Thread Daniel Siegmann
I've never used Mesos, sorry.

On Fri, Nov 14, 2014 at 5:30 PM, Steve Lewis  wrote:

> The cluster runs Mesos and I can see the tasks in the Mesos UI but most
> are not doing much - any hints about that UI
>
> On Fri, Nov 14, 2014 at 11:39 AM, Daniel Siegmann <
> daniel.siegm...@velos.io> wrote:
>
>> Most of the information you're asking for can be found on the Spark web
>> UI (see here <http://spark.apache.org/docs/1.1.0/monitoring.html>). You
>> can see which tasks are being processed by which nodes.
>>
>> If you're using HDFS and your file size is smaller than the HDFS block
>> size you will only have one partition (remember, there is exactly one task
>> for each partition in a stage). If you want to force it to have more
>> partitions, you can call RDD.repartition(numPartitions). Note that this
>> will introduce a shuffle you wouldn't otherwise have.
>>
>> Also make sure your job is allocated more than one core in your cluster
>> (you can see this on the web UI).
>>
>> On Fri, Nov 14, 2014 at 2:18 PM, Steve Lewis 
>> wrote:
>>
>>>  I have instrumented word count to track how many machines the code runs
>>> on. I use an accumulator to maintain a Set or MacAddresses. I find that
>>> everything is done on a single machine. This is probably optimal for word
>>> count but not the larger problems I am working on.
>>> How to a force processing to be split into multiple tasks. How to I
>>> access the task and attempt numbers to track which processing happens in
>>> which attempt. Also is using MacAddress to determine which machine is
>>> running the code.
>>> As far as I can tell a simple word count is running in one thread on
>>>  one machine and the remainder of the cluster does nothing,
>>> This is consistent with tests where I write to sdout from functions and
>>> see little output on most machines in the cluster
>>>
>>>
>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 54 W 40th St, New York, NY 10018
>> E: daniel.siegm...@velos.io W: www.velos.io
>>
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: How do you force a Spark Application to run in multiple tasks

2014-11-14 Thread Daniel Siegmann
Most of the information you're asking for can be found on the Spark web UI (see
here <http://spark.apache.org/docs/1.1.0/monitoring.html>). You can see
which tasks are being processed by which nodes.

If you're using HDFS and your file size is smaller than the HDFS block size
you will only have one partition (remember, there is exactly one task for
each partition in a stage). If you want to force it to have more
partitions, you can call RDD.repartition(numPartitions). Note that this
will introduce a shuffle you wouldn't otherwise have.

Also make sure your job is allocated more than one core in your cluster
(you can see this on the web UI).

On Fri, Nov 14, 2014 at 2:18 PM, Steve Lewis  wrote:

>  I have instrumented word count to track how many machines the code runs
> on. I use an accumulator to maintain a Set or MacAddresses. I find that
> everything is done on a single machine. This is probably optimal for word
> count but not the larger problems I am working on.
> How to a force processing to be split into multiple tasks. How to I access
> the task and attempt numbers to track which processing happens in which
> attempt. Also is using MacAddress to determine which machine is running the
> code.
> As far as I can tell a simple word count is running in one thread on  one
> machine and the remainder of the cluster does nothing,
> This is consistent with tests where I write to sdout from functions and
> see little output on most machines in the cluster
>
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-13 Thread Daniel Siegmann
On Thu, Nov 13, 2014 at 3:24 PM, Pala M Muthaia  wrote

>
> No i don't want separate RDD because each of these partitions are being
> processed the same way (in my case, each partition corresponds to HBase
> keys belonging to one region server, and i will do HBase lookups). After
> that i have aggregations too, hence all these partitions should be in the
> same RDD. The reason to follow the partition structure is to limit
> concurrent HBase lookups targeting a single region server.
>

Neither of these is necessarily a barrier to using separate RDDs. You can
define the function you want to use and then pass it to multiple map
methods. Then you could union all the RDDs to do your aggregations. For
example, it might look something like this:

val paths: String = ... // the paths to the files you want to load
def myFunc(t: T) = ... // the function to apply to every RDD
val rdds = paths.map { path =>
sc.textFile(path).map(myFunc)
}
val completeRdd = sc.union(rdds)

Does that make any sense?

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Accessing RDD within another RDD map

2014-11-13 Thread Daniel Siegmann
You cannot reference an RDD within a closure passed to another RDD. Your
code should instead look like this:

val rdd1 = sc.parallelize(1 to 10)
val rdd2 = sc.parallelize(11 to 20)
val rdd2Count = rdd2.count
rdd1.map{ i =>
 rdd2Count
}
.foreach(println(_))

You should also note that even if your original code did work, you would be
re-counting rdd2 for every single record in rdd1. Unless your RDD is cached
/ persisted, the count will be recomputed every time it is called. So that
would be extremely inefficient.


On Thu, Nov 13, 2014 at 2:28 PM, Simone Franzini 
wrote:

> The following code fails with NullPointerException in RDD class on the
> count function:
>
> val rdd1 = sc.parallelize(1 to 10)
> val rdd2 = sc.parallelize(11 to 20)
> rdd1.map{ i =>
>  rdd2.count
> }
> .foreach(println(_))
>
> The same goes for any other action I am trying to perform inside the map
> statement. I am failing to understand what I am doing wrong.
> Can anyone help with this?
>
> Thanks,
> Simone Franzini, PhD
>
> http://www.linkedin.com/in/simonefranzini
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-13 Thread Daniel Siegmann
I believe Rishi is correct. I wouldn't rely on that though - all it would
take is for one file to exceed the block size and you'd be setting yourself
up for pain. Also, if your files are small - small enough to fit in a
single record - you could use SparkContext.wholeTextFile.

On Thu, Nov 13, 2014 at 10:11 AM, Rishi Yadav  wrote:

> If your data is in hdfs and you are reading as textFile and each file is
> less than block size, my understanding is it would always have one
> partition per file.
>
>
> On Thursday, November 13, 2014, Daniel Siegmann 
> wrote:
>
>> Would it make sense to read each file in as a separate RDD? This way you
>> would be guaranteed the data is partitioned as you expected.
>>
>> Possibly you could then repartition each of those RDDs into a single
>> partition and then union them. I think that would achieve what you expect.
>> But it would be easy to accidentally screw this up (have some operation
>> that causes a shuffle), so I think you're better off just leaving them as
>> separate RDDs.
>>
>> On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia <
>> mchett...@rocketfuelinc.com> wrote:
>>
>>> Hi,
>>>
>>> I have a set of input files for a spark program, with each file
>>> corresponding to a logical data partition. What is the API/mechanism to
>>> assign each input file (or a set of files) to a spark partition, when
>>> initializing RDDs?
>>>
>>> When i create a spark RDD pointing to the directory of files, my
>>> understanding is it's not guaranteed that each input file will be treated
>>> as separate partition.
>>>
>>> My job semantics require that the data is partitioned, and i want to
>>> leverage the partitioning that has already been done, rather than
>>> repartitioning again in the spark job.
>>>
>>> I tried to lookup online but haven't found any pointers so far.
>>>
>>>
>>> Thanks
>>> pala
>>>
>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 54 W 40th St, New York, NY 10018
>> E: daniel.siegm...@velos.io W: www.velos.io
>>
>
>
> --
> - Rishi
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Assigning input files to spark partitions

2014-11-13 Thread Daniel Siegmann
Would it make sense to read each file in as a separate RDD? This way you
would be guaranteed the data is partitioned as you expected.

Possibly you could then repartition each of those RDDs into a single
partition and then union them. I think that would achieve what you expect.
But it would be easy to accidentally screw this up (have some operation
that causes a shuffle), so I think you're better off just leaving them as
separate RDDs.

On Wed, Nov 12, 2014 at 10:27 PM, Pala M Muthaia <
mchett...@rocketfuelinc.com> wrote:

> Hi,
>
> I have a set of input files for a spark program, with each file
> corresponding to a logical data partition. What is the API/mechanism to
> assign each input file (or a set of files) to a spark partition, when
> initializing RDDs?
>
> When i create a spark RDD pointing to the directory of files, my
> understanding is it's not guaranteed that each input file will be treated
> as separate partition.
>
> My job semantics require that the data is partitioned, and i want to
> leverage the partitioning that has already been done, rather than
> repartitioning again in the spark job.
>
> I tried to lookup online but haven't found any pointers so far.
>
>
> Thanks
> pala
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

54 W 40th St, New York, NY 10018
E: daniel.siegm...@velos.io W: www.velos.io


Re: Is there a way to clone a JavaRDD without persisting it

2014-11-12 Thread Daniel Siegmann
As far as I know you basically have two options: let partitions be
recomputed (possibly caching / persisting memory only), or persist to disk
(and memory) and suffer the cost of writing to disk. The question is which
will be more expensive in your case. My experience is you're better off
letting things be recomputed to begin with and then trying out some
persisting.

It seems to me Spark would benefit from allowing actions to be batched up
and then having Spark execute them intelligently. That is, each partition
could be processed by multiple actions after being computed. But I don't
believe there's any way to achieve this currently.

If anyone does have a way to achieve this, I'd love to hear it. :-)

On Wed, Nov 12, 2014 at 1:23 AM, Steve Lewis  wrote:

>  In my problem I have a number of intermediate JavaRDDs and would like to
> be able to look at their sizes without destroying the RDD for sibsequent
> processing. persist will do this but these are big and perisist seems
> expensive and I am unsure of which StorageLevel is needed, Is there a way
> to clone a JavaRDD or does anyong have good ideas on how to do this?
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Custom persist or cache of RDD?

2014-11-11 Thread Daniel Siegmann
But that requires an (unnecessary) load from disk.

I have run into this same issue, where we want to save intermediate results
but continue processing. The cache / persist feature of Spark doesn't seem
designed for this case. Unfortunately I'm not aware of a better solution
with the current version of Spark.

On Mon, Nov 10, 2014 at 5:15 PM, Sean Owen  wrote:

> Well you can always create C by loading B from disk, and likewise for
> E / D. No need for any custom procedure.
>
> On Mon, Nov 10, 2014 at 7:33 PM, Benyi Wang  wrote:
> > When I have a multi-step process flow like this:
> >
> > A -> B -> C -> D -> E -> F
> >
> > I need to store B and D's results into parquet files
> >
> > B.saveAsParquetFile
> > D.saveAsParquetFile
> >
> > If I don't cache/persist any step, spark might recompute from A,B,C,D
> and E
> > if something is wrong in F.
> >
> > Of course, I'd better cache all steps if I have enough memory to avoid
> this
> > re-computation, or persist result to disk. But persisting B and D seems
> > duplicate with saving B and D as parquet files.
> >
> > I'm wondering if spark can restore B and D from the parquet files using a
> > customized persist and restore procedure?
> >
> >
> >
> >
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: SparkContext.stop() ?

2014-10-31 Thread Daniel Siegmann
It is used to shut down the context when you're done with it, but if you're
using a context for the lifetime of your application I don't think it
matters.

I use this in my unit tests, because they start up local contexts and you
can't have multiple local contexts open so each test must stop its context
when it's done.

On Fri, Oct 31, 2014 at 11:12 AM, ll  wrote:

> what is it for?  when do we call it?
>
> thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-stop-tp17826.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Unit testing: Mocking out Spark classes

2014-10-16 Thread Daniel Siegmann
Mocking these things is difficult; executing your unit tests in a local
Spark context is preferred, as recommended in the programming guide
<http://spark.apache.org/docs/latest/programming-guide.html#unit-testing>.
I know this may not technically be a unit test, but it is hopefully close
enough.

You can load your test data using SparkContext.parallelize and retrieve the
data (for verification) using RDD.collect.

On Thu, Oct 16, 2014 at 9:07 AM, Saket Kumar  wrote:

> Hello all,
>
> I am trying to unit test my classes involved my Spark job. I am trying to
> mock out the Spark classes (like SparkContext and Broadcast) so that I can
> unit test my classes in isolation. However I have realised that these are
> classes instead of traits. My first question is why?
>
> It is quite hard to mock out classes using ScalaTest+ScalaMock as the
> classes which need to be mocked out need to be annotated with
> org.scalamock.annotation.mock as per
> http://www.scalatest.org/user_guide/testing_with_mock_objects#generatedMocks.
> I cannot do that in my case as I am trying to mock out the spark classes.
>
> Am I missing something? Is there a better way to do this?
>
> val sparkContext = mock[SparkInteraction]
> val trainingDatasetLoader = mock[DatasetLoader]
> val broadcastTrainingDatasetLoader = mock[Broadcast[DatasetLoader]]
> def transformerFunction(source: Iterator[(HubClassificationData,
> String)]): Iterator[String] = {
>   source.map(_._2)
> }
> val classificationResultsRDD = mock[RDD[String]]
> val classificationResults = Array("","","")
> val inputRDD = mock[RDD[(HubClassificationData, String)]]
>
> inSequence{
>   inAnyOrder{
> (sparkContext.broadcast[DatasetLoader]
> _).expects(trainingDatasetLoader).returns(broadcastTrainingDatasetLoader)
>   }
> }
>
> val sparkInvoker = new SparkJobInvoker(sparkContext,
> trainingDatasetLoader)
>
> when(inputRDD.mapPartitions(transformerFunction)).thenReturn(classificationResultsRDD)
> sparkInvoker.invoke(inputRDD)
>
> Thanks,
> Saket
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Play framework

2014-10-16 Thread Daniel Siegmann
We execute Spark jobs from a Play application but we don't use
spark-submit. I don't know if you really want to use spark-submit, but if
not you can just create a SparkContext programmatically in your app.

In development I typically run Spark locally. Creating the Spark context is
pretty trivial:

val conf = new SparkConf().setMaster("local[*]").setAppName(s"My Awesome
App")
// call conf.set for any other configuration you want
val sc = new SparkContext(sparkConf)

It is important to keep in mind you cannot have multiple local contexts
(you can create them but you'll get odd errors), so if you are running
things in parallel within your app (even unit tests) you'd need to share a
context in this case. If you are running sequentially you can create a new
local context each time, but you must make sure to call SparkContext.stop()
when you're done.

Running against a cluster is a bit more complicated because you need to add
all your dependency jars. I'm not sure how to get this to work with play run.
I stick to building the app with play dist and then running against the
packaged application, because it very conveniently provides all the
dependencies in a lib folder. Here is some code to load all the paths you
need from the dist:

def libs : Seq[String] = {
val libDir = play.api.Play.application.getFile("lib")

logger.info(s"SparkContext will be initialized with libraries from
directory $libDir")

return if ( libDir.exists ) {

libDir.listFiles().map(_.getCanonicalFile().getAbsolutePath()).filter(_.endsWith(".jar"))
} else {
throw new IllegalStateException(s"lib dir is missing: $libDir")
}
}

Creating the context is similar to above, but with this extra line:

conf.setJars(libs)

I hope this helps. I should note that I don't use play run very much, at
least not for when I'm actually executing Spark jobs. So I'm not sure if
this integrates properly with that. I have unit tests which execute on
Spark and have executed the dist package both locally and on a cluster. To
make working with the dist locally easier, I wrote myself a little shell
script to unzip and run the dist.


On Wed, Oct 15, 2014 at 10:51 PM, Mohammed Guller 
wrote:

>  Hi –
>
>
>
> Has anybody figured out how to integrate a Play application with Spark and
> run it on a Spark cluster using spark-submit script? I have seen some blogs
> about creating a simple Play app and running it locally on a dev machine
> with sbt run command. However, those steps don’t work for Spark-submit.
>
>
>
> If you have figured out how to build and run a Play app with Spark-submit,
> I would appreciate if you could share the steps and the sbt settings for
> your Play app.
>
>
>
> Thanks,
>
> Mohammed
>
>
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Spark inside Eclipse

2014-10-02 Thread Daniel Siegmann
You don't need to do anything special to run in local mode from within
Eclipse. Just create a simple SparkConf and create a SparkContext from
that. I have unit tests which execute on a local SparkContext, and they
work from inside Eclipse as well as SBT.

val conf = new SparkConf().setMaster("local").setAppName(s"Whatever")
val sc = new SparkContext(sparkConf)

Keep in mind you can only have one local SparkContext at a time, otherwise
you will get some weird errors. If you have tests running sequentially,
make sure to close the SparkContext in your tear down method. If tests run
in parallel you'll need to share the SparkContext between tests.

For unit testing, you can make use of SparkContext.parallelize to set up
your test inputs and RDD.collect to retrieve the outputs.


On Wed, Oct 1, 2014 at 7:43 PM, Ashish Jain  wrote:

> Hello Sanjay,
>
> This can be done, and is a very effective way to debug.
>
> 1) Compile and package your project to get a fat jar
> 2) In your SparkConf use setJars and give location of this jar. Also set
> your master here as local in SparkConf
> 3) Use this SparkConf when creating JavaSparkContext
> 4) Debug your program like you would any normal program.
>
> Hope this helps.
>
> Thanks
> Ashish
> On Oct 1, 2014 4:35 PM, "Sanjay Subramanian"
>  wrote:
>
>> hey guys
>>
>> Is there a way to run Spark in local mode from within Eclipse.
>> I am running Eclipse Kepler on a Macbook Pro with Mavericks
>> Like one can run hadoop map/reduce applications from within Eclipse and
>> debug and learn.
>>
>> thanks
>>
>> sanjay
>>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to get SparckContext inside mapPartitions?

2014-10-01 Thread Daniel Siegmann
I don't think you can get a SparkContext inside an RDD function (such as
mapPartitions), but you shouldn't need to. Have you considered returning
the data read from the database from mapPartitions to create a new RDD and
then just save it to a file like normal?

For example:

rddObject.mapPartitions(x => {
x.map(getDataFromDB(_))
}, true).saveAsTextFile("hdfs:///some-folder/")

Does that make sense?

On Wed, Oct 1, 2014 at 12:52 AM, Henry Hung  wrote:

>  Hi All,
>
>
>
> A noob question:
>
> How to get SparckContext inside mapPartitions?
>
>
>
> Example:
>
>
>
> Let’s say I have rddObjects that can be split into different partitions to
> be assigned to multiple executors, to speed up the export data from
> database.
>
>
>
> Variable sc is created in the main program using these steps:
>
> val conf = new SparkConf().setAppName("ETCH VM Get FDC")
>
> val sc = new SparkContext(conf)
>
>
>
> and here is the mapPartitions code:
>
> rddObject.mapPartitions(x => {
>
>   val uuid =  java.util.UUID.randomUUID.toString
>
>   val path = new org.apache.hadoop.fs.Path(“hdfs:///some-folder/” + uuid)
>
>   val fs = path.getFileSystem(sc.hadoopConfiguration)
>
>   val pw = new PrintWriter(fs.create(path))
>
>   while (x.hasNext) {
>
> // … do something here, like fetch data from database and write it to
> hadoop file
>
> pw.println(getDataFromDB(x.next))
>
>   }
>
> })
>
>
>
> My question is how can I use sc to get the hadoopConfiguration, thus
> enable me to create Hadoop file?
>
>
>
> Best regards,
>
> Henry
>
> --
> The privileged confidential information contained in this email is
> intended for use only by the addressees as indicated by the original sender
> of this email. If you are not the addressee indicated in this email or are
> not responsible for delivery of the email to such a person, please kindly
> reply to the sender indicating this fact and delete all copies of it from
> your computer and network server immediately. Your cooperation is highly
> appreciated. It is advised that any unauthorized use of confidential
> information of Winbond is strictly prohibited; and any information in this
> email irrelevant to the official business of Winbond shall be deemed as
> neither given nor endorsed by Winbond.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: about partition number

2014-09-29 Thread Daniel Siegmann
A "task" is the work to be done on a partition for a given stage - you
should expect the number of tasks to be equal to the number of partitions
in each stage, though a task might need to be rerun (due to failure or need
to recompute some data).

2-4 times the cores in your cluster should be a good starting place. Then
you can try different values and see how it affects your performance.

On Mon, Sep 29, 2014 at 5:01 PM, anny9699  wrote:

> Hi,
>
> I read the past posts about partition number, but am still a little
> confused
> about partitioning strategy.
>
> I have a cluster with 8 works and 2 cores for each work. Is it true that
> the
> optimal partition number should be 2-4 * total_coreNumber or should
> approximately equal to total_coreNumber? Or it's the task number that
> really
> determines the speed rather then partition number?
>
> Thanks a lot!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/about-partition-number-tp15362.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: How to do operations on multiple RDD's

2014-09-26 Thread Daniel Siegmann
There are numerous ways to combine RDDs. In your case, it seems you have
several RDDs of the same type and you want to do an operation across all of
them as if they were a single RDD. The way to do this is SparkContext.union
or RDD.union, which have minimal overhead. The only difference between
these is the latter allows you to only union two at a time (but of course
you can just call reduce on your sequence to union them all).

Keep in mind this won't repartition anything, so if you find you have too
many partitions after the union you could use RDD.coalesce to merge them.

On Fri, Sep 26, 2014 at 11:55 AM, Johan Stenberg 
wrote:

> Hi,
>
> This is my first post to the email list so give me some feedback if I do
> something wrong.
>
> To do operations on two RDD's to produce a new one you can just use
> zipPartitions, but if I have an arbitrary number of RDD's that I would like
> to perform an operation on to produce a single RDD, how do I do that? I've
> been reading the docs but haven't found anything.
>
> For example: if I have a Seq of RDD[Array[Int]]'s and I want to take the
> majority of each array cell. So if all RDD's have one array which are like
> this:
>
> [1, 2, 3]
> [0, 0, 0]
> [1, 2, 0]
>
> Then the resulting RDD would have the array [1, 2, 0]. How do I approach
> this problem? It becomes too heavy to have an accumulator variable I guess?
> Otherwise it could be an array of maps with values as keys and frequency as
> values.
>
> Essentially I want something like zipPartitions but for arbitrarily many
> RDD's, is there any such functionality or how would I approach this problem?
>
> Cheers,
>
> Johan
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: mappartitions data size

2014-09-26 Thread Daniel Siegmann
Use RDD.repartition (see here:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
).

On Fri, Sep 26, 2014 at 10:19 AM, jamborta  wrote:

> Hi all,
>
> I am using mappartitions to do some heavy computing on subsets of the data.
> I have a dataset with about 1m rows, running on a 32 core cluster.
> Unfortunately, is seems that mappartitions splits the data into two sets so
> it is only running on two cores.
>
> Is there a way to force it to split into smaller chunks?
>
> thanks,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mappartitions-data-size-tp15231.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Spark as a Library

2014-09-16 Thread Daniel Siegmann
You can create a new SparkContext inside your container pointed to your
master. However, for your script to run you must call addJars to put the
code on your workers' classpaths (except when running locally).

Hopefully your webapp has some lib folder which you can point to as a
source for the jars. In the Play Framework you can use
play.api.Play.application.getFile("lib") to get a path to the lib directory
and get the contents. Of course that only works on the packaged web app.

On Tue, Sep 16, 2014 at 3:17 PM, Ruebenacker, Oliver A <
oliver.ruebenac...@altisource.com> wrote:

>
>
>  Hello,
>
>
>
>   Thanks for the response and great to hear it is possible. But how do I
> connect to Spark without using the submit script?
>
>
>
>   I know how to start up a master and some workers and then connect to the
> master by packaging the app that contains the SparkContext and then
> submitting the package with the spark-submit script in standalone-mode. But
> I don’t want to submit the app that contains the SparkContext via the
> script, because I want that app to be running on a web server. So, what are
> other ways to connect to Spark? I can’t find in the docs anything other
> than using the script. Thanks!
>
>
>
>  Best, Oliver
>
>
>
> *From:* Matei Zaharia [mailto:matei.zaha...@gmail.com]
> *Sent:* Tuesday, September 16, 2014 1:31 PM
> *To:* Ruebenacker, Oliver A; user@spark.apache.org
> *Subject:* Re: Spark as a Library
>
>
>
> If you want to run the computation on just one machine (using Spark's
> local mode), it can probably run in a container. Otherwise you can create a
> SparkContext there and connect it to a cluster outside. Note that I haven't
> tried this though, so the security policies of the container might be too
> restrictive. In that case you'd have to run the app outside and expose an
> RPC interface between them.
>
>
>
> Matei
>
>
>
> On September 16, 2014 at 8:17:08 AM, Ruebenacker, Oliver A (
> oliver.ruebenac...@altisource.com) wrote:
>
>
>
>  Hello,
>
>
>
>   Suppose I want to use Spark from an application that I already submit to
> run in another container (e.g. Tomcat). Is this at all possible? Or do I
> have to split the app into two components, and submit one to Spark and one
> to the other container? In that case, what is the preferred way for the two
> components to communicate with each other? Thanks!
>
>
>
>  Best, Oliver
>
>
>
> Oliver Ruebenacker | Solutions Architect
>
>
>
> Altisource™
>
> 290 Congress St, 7th Floor | Boston, Massachusetts 02210
>
> P: (617) 728-5582 | ext: 275585
>
> oliver.ruebenac...@altisource.com | www.Altisource.com
>
>
>
> ***
>
>
> This email message and any attachments are intended solely for the use of
> the addressee. If you are not the intended recipient, you are prohibited
> from reading, disclosing, reproducing, distributing, disseminating or
> otherwise using this transmission. If you have received this message in
> error, please promptly notify the sender by reply email and immediately
> delete this message from your system. This message and any attachments
> may contain information that is confidential, privileged or exempt from
> disclosure. Delivery of this message to any person other than the intended
> recipient is not intended to waive any right or privilege. Message
> transmission is not guaranteed to be secure or free of software viruses.
>
> ***
>
>
> ***
>
> This email message and any attachments are intended solely for the use of
> the addressee. If you are not the intended recipient, you are prohibited
> from reading, disclosing, reproducing, distributing, disseminating or
> otherwise using this transmission. If you have received this message in
> error, please promptly notify the sender by reply email and immediately
> delete this message from your system.
> This message and any attachments may contain information that is
> confidential, privileged or exempt from disclosure. Delivery of this
> message to any person other than the intended recipient is not intended to
> waive any right or privilege. Message transmission is not guaranteed to be
> secure or free of software viruses.
>
> ***
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Filter function problem

2014-09-09 Thread Daniel Siegmann
You should not be broadcasting an RDD. You also should not be passing an
RDD in a lambda to another RDD. If you want, can call RDD.collect and then
broadcast those values (of course you must be able to fit all those values
in memory).

On Tue, Sep 9, 2014 at 6:34 AM, Blackeye  wrote:

> In order to help anyone to answer i could say that i checked the
> inactiveIDs.filter operation seperated, and I found that it doesn't return
> null in any case. In addition i don't how to handle (or check) whether a
> RDD
> is null. I find the debugging to complicated to point the error. Any ideas
> how to find the null pointer?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Where to save intermediate results?

2014-09-02 Thread Daniel Siegmann
I don't have any personal experience with Spark Streaming. Whether you
store your data in HDFS or a database or something else probably depends on
the nature of your use case.


On Fri, Aug 29, 2014 at 10:38 AM, huylv 
wrote:

> Hi Daniel,
>
> Your suggestion is definitely an interesting approach. In fact, I already
> have another system to deal with the stream analytical processing part. So
> basically, the Spark job to aggregate data just accumulatively computes
> aggregations from historical data together with new batch, which has been
> partly summarized by the stream processor. Answering queries involves in
> combining pre-calculated historical data together with on-stream
> aggregations. This sounds much like what Spark Streaming is intended to do.
> So I'll take a look deeper into Spark Streaming to consider porting the
> stream processing part to use Spark Streaming.
>
> Regarding saving pre-calculated data onto external storages (disk,
> database...), I'm looking at Cassandra for now. But I don't know how it
> fits
> into my context and how is its performance compared to saving to files in
> HDFS. Also, is there anyway to keep the precalculated data both on disk and
> on memory, so that when the batch job terminated, historical data still
> available on memory for combining with stream processor, while still be
> able
> to survive system failure or upgrade? Not to mention the size of
> precalculated data might get too big, in that case, partly store newest
> data
> on memory only would be better. Tachyon looks like a nice option but again,
> I don't have experience with it and it's still an experimental feature of
> Spark.
>
> Regards,
> Huy
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062p13127.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Where to save intermediate results?

2014-08-28 Thread Daniel Siegmann
I assume your on-demand calculations are a streaming flow? If your data
aggregated from batch isn't too large, maybe you should just save it to
disk; when your streaming flow starts you can read the aggregations back
from disk and perhaps just broadcast them. Though I guess you'd have to
restart your streaming flow when these aggregations are updated.

For something more sophisticated, maybe look at Redis <http://redis.io/> or
some distributed database? Your ETL can update that store, and your
on-demand job can query it.


On Thu, Aug 28, 2014 at 4:30 PM, huylv  wrote:

> Hi,
>
> I'm building a system for near real-time data analytics. My plan is to have
> an ETL batch job which calculates aggregations running periodically. User
> queries are then parsed for on-demand calculations, also in Spark. Where
> are
> the pre-calculated results supposed to be saved? I mean, after finishing
> aggregations, the ETL job will terminate, so caches are wiped out of
> memory.
> How can I use these results to calculate on-demand queries? Or more
> generally, could you please give me a good way to organize the data flow
> and
> jobs in order to achieve this?
>
> I'm new to Spark so sorry if this might sound like a dumb question.
>
> Thank you.
> Huy
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Q on downloading spark for standalone cluster

2014-08-28 Thread Daniel Siegmann
If you aren't using Hadoop, I don't think it matters which you download.
I'd probably just grab the Hadoop 2 package.

Out of curiosity, what are you using as your data store? I get the
impression most Spark users are using HDFS or something built on top.


On Thu, Aug 28, 2014 at 4:07 PM, Sanjeev Sagar <
sanjeev.sa...@mypointscorp.com> wrote:

> Hello there,
>
> I've a basic question on the downloadthat which option I need to
> downloadfor standalone cluster.
>
> I've a private cluster of three machineson Centos. When I click on
> download it shows me following:
>
>
>Download Spark
>
> The latest release is Spark 1.0.2, released August 5, 2014 (release notes)
> <http://spark.apache.org/releases/spark-release-1-0-2.html> (git tag) <
> https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=
> 8fb6f00e195fb258f3f70f04756e07c259a2351f>
>
> Pre-built packages:
>
>  * For Hadoop 1 (HDP1, CDH3): find an Apache mirror
><http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/
> spark-1.0.2-bin-hadoop1.tgz>
>or direct file download
><http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop1.tgz>
>  * For CDH4: find an Apache mirror
><http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/
> spark-1.0.2-bin-cdh4.tgz>
>or direct file download
><http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz>
>  * For Hadoop 2 (HDP2, CDH5): find an Apache mirror
><http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/
> spark-1.0.2-bin-hadoop2.tgz>
>or direct file download
><http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop2.tgz>
>
> Pre-built packages, third-party (NOTE: may include non ASF-compatible
> licenses):
>
>  * For MapRv3: direct file download (external)
><http://package.mapr.com/tools/apache-spark/1.0.2/
> spark-1.0.2-bin-mapr3.tgz>
>  * For MapRv4: direct file download (external)
><http://package.mapr.com/tools/apache-spark/1.0.2/
> spark-1.0.2-bin-mapr4.tgz>
>
>
> From the above it looks like that I've to donwload Hadoop or CDH4 first in
> order to use Spark ? I've a standalone cluster and my data size is also
> like hundreds of Gig or close to Terabyte.
>
> I don't get it that which one I need to download from the above list.
>
> Could some one assist me that which one I need to download for standalone
> cluster and for big data foot print ?
>
> or Hadoop is needed or mandatory for using Spark? that's not the
> understanding I've. My understanding is that you can use spark with Hadoop
> if you like from yarn2 but you could use spark standalone also without
> hadoop.
>
> Please assist. I'm confused !
>
> -Sanjeev
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Development environment issues

2014-08-25 Thread Daniel Siegmann
On Thu, Aug 21, 2014 at 6:21 PM, pierred  wrote:

So, what is the accepted wisdom in terms of IDE and development environment?
>

I don't know what the accepted wisdom is. I've been getting by with the
Scala IDE for Eclipse, though I am using the stable version - as you noted,
this keeps me from upgrading to the latest Eclipse version. The quality of
the Scala IDE is poor, but I have found it generally usable. I generate the
Eclipse project files from SBT. Debugging does work (mostly) - just be
aware you can't easily step into a lambda, so it's easiest to add a
breakpoint inside of it.

As for unit testing, both Specs2 and ScalaTest work, and I can run
individual tests within Eclipse. For Specs2 there is an Eclipse plugin, and
for ScalaTest you can annotate your tests with
@RunWith(classOf[JUnitRunner]) and it'll work in the usual JUnit tools. I
have automated tests running in Bamboo. Took a bit of wrangling to get the
test output picked up, but it works.


> Is there a good tutorial to set things up so that one half of the
> libraries/tools doesn't break the other half?
>

No idea.


> What do you guys use?
> scala 2.10 or 2.11?
> sbt or maven?
> eclipse or idea?
> jdk7 or 8?
>

I'm using Java 7 and Scala 2.10.x (not every framework I use supports later
versions). SBT because I use the Play Framework, but I miss Maven. I
haven't tried IntelliJ's Scala support, but it's probably worth a shot.

The tooling isn't nearly as solid as what Java has, but I make due.

-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: heterogeneous cluster hardware

2014-08-21 Thread Daniel Siegmann
If you use Spark standalone, you could start multiple workers on some
machines. Size your worker configuration to be appropriate for the weak
machines, and start multiple on your beefier machines.

It may take a bit of work to get that all hooked up - probably you'll want
to write some scripts to start everything on all your nodes correctly. But
hopefully it will work smoothly once the cluster is up and running.


On Thu, Aug 21, 2014 at 11:42 AM, anthonyjschu...@gmail.com <
anthonyjschu...@gmail.com> wrote:

> Jörn, thanks for the post...
>
> Unfortunately, I am stuck with the hardware I have and might not be
> able to get budget allocated for a new stack of servers when I've
> already got so many "ok" servers on hand... And even more
> unfortunately, a large subset of these machines are... shall we say...
> extremely humble in their cpus and ram. My group has exclusive access
> to the machine and rarely do we need to run concurrent jobs-- What I
> really want is max capacity per-job. The applications are massive
> machine-learning experiments, so I'm not sure about the feasibility of
> breaking it up into concurrent jobs. At this point, I am seriously
> considering dropping down to Akka-level programming. Why, oh why,
> doesn't spark allow for allocating variable worker threads per host?
> this would seem to be the correct point of abstraction that would
> allow the construction of massive clusters using "on-hand" hardware?
> (the scheduler probably wouldn't have to change at all)
>
> On Thu, Aug 21, 2014 at 9:25 AM, Jörn Franke [via Apache Spark User
> List] <[hidden email] <http://user/SendEmail.jtp?type=node&node=12587&i=0>>
> wrote:
>
> > Hi,
> >
> > Well, you could use Mesos or Yarn2 to define  resources per Job - you
> can
> > give only as much resources (cores, memory etc.) per machine as your
> "worst"
> > machine has. The rest is done by Mesos or Yarn. By doing this you avoid
> a
> > per machine resource assignment without any disadvantages. You can run
> > without any problems run other jobs in parallel and older machines won't
> get
> > overloaded.
> >
> > however, you should take care that your cluster does not get too
> > heterogeneous.
> >
> > Best regards,
> > Jörn
> >
> > Le 21 août 2014 16:55, "[hidden email]" <[hidden email]> a écrit :
> >>
> >> I've got a stack of Dell Commodity servers-- Ram~>(8 to 32Gb) single or
> >> dual
> >> quad core processor cores per machine. I think I will have them loaded
> >> with
> >> CentOS. Eventually, I may want to add GPUs on the nodes to handle
> linear
> >> alg. operations...
> >>
> >> My Idea has been:
> >>
> >> 1) to find a way to configure Spark to allocate different resources
> >> per-machine, per-job. -- at least have a "standard executor"... and
> allow
> >> different machines to have different numbers of executors.
> >>
> >> 2) make (using vanilla spark) a pre-run optimization phase which
> >> benchmarks
> >> the throughput of each node (per hardware), and repartition the dataset
> to
> >> more efficiently use the hardware rather than rely on Spark
> Speculation--
> >> which has always seemed a dis-optimal way to balance the load across
> >> several
> >> differing machines.
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567p12581.html
> >> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>
> >> -
> >> To unsubscribe, e-mail: [hidden email]
> >> For additional commands, e-mail: [hidden email]
> >>
> >
> >
> > 
> > If you reply to this email, your message will be added to the discussion
> > below:
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567p12585.html
> > To unsubscribe from heterogeneous cluster hardware, click here.
> > NAML
>
>
>
> --
> A  N  T  H  O  N  Y   Ⓙ   S  C  H  U  L  T  E
>
> --
> View this message in context: Re: heterogeneous cluster hardware
> <http://apache-spark-user-list.1001560.n3.nabble.com/heterogeneous-cluster-hardware-tp11567p12587.html>
>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Ways to partition the RDD

2014-08-14 Thread Daniel Siegmann
There may be cases where you want to adjust the number of partitions or
explicitly call RDD.repartition or RDD.coalesce. However, I would start
with the defaults and then adjust if necessary to improve performance (for
example, if cores are idling because there aren't enough tasks you may want
more partitions).

Looks like PairRDDFunctions has a countByKey (though you'd still need to
distinct first). You might also look at combineByKey and foldByKey as
alternatives to reduceByKey or groupByKey.


On Thu, Aug 14, 2014 at 4:14 PM, bdev  wrote:

> Thanks Daniel for the detailed information. Since the RDD is already
> partitioned, there is no need to worry about repartitioning.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Ways-to-partition-the-RDD-tp12083p12136.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Ways to partition the RDD

2014-08-14 Thread Daniel Siegmann
First, I think you might have a misconception about partitioning. ALL RDDs
are partitioned (even if they are a single partition). When reading from
HDFS the number of partitions depends on how the data is stored in HDFS.
After data is shuffled (generally caused by things like reduceByKey), the
number of partitions will be whatever is set as the default parallelism
(see https://spark.apache.org/docs/latest/configuration.html). Most of
these methods allow you to specify a different number of partitions as a
parameter

The number of partitions == the number of tasks. So generally the number of
partitions you want will be enough to keep all your cores (not # nodes - #
cores) busy.

Also, to create a key you can use the map method to return a Tuple2 as
Santosh showed. You can also use keyBy.

If you just want to get the number of unique users, I would do something
like this:

val userIdColIndex: Int = ...
val pageIdColIndex: Int = ...
val inputPath: String = ...
// I'm assuming the user and page ID are both strings
val pageVisitorRdd: RDD[(String, String)] = sc.textFile(inputPath).map(
record =>
   val colValues = record.split('\t')
   // You might want error handling in here - I'll assume all records are
valid
   val userId = colValues(userIdColIndex)
   val pageId = colValues(pageIdColIndex)
   (pageId, userId) // Here's your key-value pair
}

So now that you have your pageId -> userId mappings, what to do with them?
Maybe the most obvious would be:

val uniquePageVisits: RDD[(String, Int)] =
pageVistorRdd.groupByKey().mapValues(_.toSet.size)

But groupByKey will be a bad choice if you have many visits per page
(you'll end up with a large collection in each record). It might be better
to start with a distinct, then map to (pageId, 1) and reduceByKey(_+_) to
get the sums.

I hope that helps.


On Thu, Aug 14, 2014 at 2:14 PM, bdev  wrote:

> Thanks, will give that a try.
>
> I see the number of partitions requested is 8 (through HashPartitioner(8)).
> If I have a 40 node cluster, whats the recommended number of partitions?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Ways-to-partition-the-RDD-tp12083p12128.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Number of partitions and Number of concurrent tasks

2014-08-01 Thread Daniel Siegmann
It is definitely possible to run multiple workers on a single node and have
each worker with the maximum number of cores (e.g. if you have 8 cores and
2 workers you'd have 16 cores per node). I don't know if it's possible with
the out of the box scripts though.

It's actually not really that difficult. You just run start-slave.sh
multiple times on the same node, with different IDs. Here is the usage:

# Usage: start-slave.sh  

But we have custom scripts to do that. I'm not sure whether it is possible
using the standard start-all.sh script or that EC2 script. Probably not.

I haven't set up or managed such a cluster myself, so that's about the
extent of my knowledge. But I've deployed jobs to that cluster and enjoyed
the benefit of double the cores - we had a fair amount of I/O though, which
may be why it helped in our case. I recommend taking a look at the CPU
utilization on the nodes when running a flow before jumping through these
hoops.


On Fri, Aug 1, 2014 at 12:05 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Darin,
>
> I think the number of cores in your cluster is a hard limit on how many
> concurrent tasks you can execute at one time. If you want more parallelism,
> I think you just need more cores in your cluster--that is, bigger nodes, or
> more nodes.
>
> Daniel,
>
> Have you been able to get around this limit?
>
> Nick
>
>
>
> On Fri, Aug 1, 2014 at 11:49 AM, Daniel Siegmann  > wrote:
>
>> Sorry, but I haven't used Spark on EC2 and I'm not sure what the problem
>> could be. Hopefully someone else will be able to help. The only thing I
>> could suggest is to try setting both the worker instances and the number of
>> cores (assuming spark-ec2 has such a parameter).
>>
>>
>> On Thu, Jul 31, 2014 at 3:03 PM, Darin McBeath 
>> wrote:
>>
>>> Ok, I set the number of spark worker instances to 2 (below is my startup
>>> command).  But, this essentially had the effect of increasing my number of
>>> workers from 3 to 6 (which was good) but it also reduced my number of cores
>>> per worker from 8 to 4 (which was not so good).  In the end, I would still
>>> only be able to concurrently process 24 partitions in parallel.  I'm
>>> starting a stand-alone cluster using the spark provided ec2 scripts .  I
>>> tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py
>>> but this had no effect. So, it's not clear if I could even set the
>>> SPARK_WORKER_CORES with the ec2 scripts.  Anyway, not sure if there is
>>> anything else I can try but at least wanted to document what I did try and
>>> the net effect.  I'm open to any suggestions/advice.
>>>
>>>  ./spark-ec2 -k *key* -i key.pem --hadoop-major-version=2 launch -s 3
>>> -t m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2
>>> *my-cluster*
>>>
>>>
>>>   --
>>>  *From:* Daniel Siegmann 
>>> *To:* Darin McBeath 
>>> *Cc:* Daniel Siegmann ; "user@spark.apache.org"
>>> 
>>> *Sent:* Thursday, July 31, 2014 10:04 AM
>>>
>>> *Subject:* Re: Number of partitions and Number of concurrent tasks
>>>
>>> I haven't configured this myself. I'd start with setting
>>> SPARK_WORKER_CORES to a higher value, since that's a bit simpler than
>>> adding more workers. This defaults to "all available cores" according to
>>> the documentation, so I'm not sure if you can actually set it higher. If
>>> not, you can get around this by adding more worker instances; I believe
>>> simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient.
>>>
>>> I don't think you *have* to set the cores if you have more workers - it
>>> will default to 8 cores per worker (in your case). But maybe 16 cores per
>>> node will be too many. You'll have to test. Keep in mind that more workers
>>> means more memory and such too, so you may need to tweak some other
>>> settings downward in this case.
>>>
>>> On a side note: I've read some people found performance was better when
>>> they had more workers with less memory each, instead of a single worker
>>> with tons of memory, because it cut down on garbage collection time. But I
>>> can't speak to that myself.
>>>
>>> In any case, if you increase the number of cores available in your
>>> cluster (whether per worker, or adding more workers per node, or of course
>>> adding more nodes) you should see more tasks running

Re: Number of partitions and Number of concurrent tasks

2014-08-01 Thread Daniel Siegmann
Sorry, but I haven't used Spark on EC2 and I'm not sure what the problem
could be. Hopefully someone else will be able to help. The only thing I
could suggest is to try setting both the worker instances and the number of
cores (assuming spark-ec2 has such a parameter).


On Thu, Jul 31, 2014 at 3:03 PM, Darin McBeath  wrote:

> Ok, I set the number of spark worker instances to 2 (below is my startup
> command).  But, this essentially had the effect of increasing my number of
> workers from 3 to 6 (which was good) but it also reduced my number of cores
> per worker from 8 to 4 (which was not so good).  In the end, I would still
> only be able to concurrently process 24 partitions in parallel.  I'm
> starting a stand-alone cluster using the spark provided ec2 scripts .  I
> tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py
> but this had no effect. So, it's not clear if I could even set the
> SPARK_WORKER_CORES with the ec2 scripts.  Anyway, not sure if there is
> anything else I can try but at least wanted to document what I did try and
> the net effect.  I'm open to any suggestions/advice.
>
>  ./spark-ec2 -k *key* -i key.pem --hadoop-major-version=2 launch -s 3 -t
> m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2
> *my-cluster*
>
>
>   ------
>  *From:* Daniel Siegmann 
> *To:* Darin McBeath 
> *Cc:* Daniel Siegmann ; "user@spark.apache.org"
> 
> *Sent:* Thursday, July 31, 2014 10:04 AM
>
> *Subject:* Re: Number of partitions and Number of concurrent tasks
>
> I haven't configured this myself. I'd start with setting
> SPARK_WORKER_CORES to a higher value, since that's a bit simpler than
> adding more workers. This defaults to "all available cores" according to
> the documentation, so I'm not sure if you can actually set it higher. If
> not, you can get around this by adding more worker instances; I believe
> simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient.
>
> I don't think you *have* to set the cores if you have more workers - it
> will default to 8 cores per worker (in your case). But maybe 16 cores per
> node will be too many. You'll have to test. Keep in mind that more workers
> means more memory and such too, so you may need to tweak some other
> settings downward in this case.
>
> On a side note: I've read some people found performance was better when
> they had more workers with less memory each, instead of a single worker
> with tons of memory, because it cut down on garbage collection time. But I
> can't speak to that myself.
>
> In any case, if you increase the number of cores available in your cluster
> (whether per worker, or adding more workers per node, or of course adding
> more nodes) you should see more tasks running concurrently. Whether this
> will actually be *faster* probably depends mainly on whether the CPUs in
> your nodes were really being fully utilized with the current number of
> cores.
>
>
> On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath 
> wrote:
>
> Thanks.
>
> So to make sure I understand.  Since I'm using a 'stand-alone' cluster, I
> would set SPARK_WORKER_INSTANCES to something like 2 (instead of the
> default value of 1).  Is that correct?  But, it also sounds like I need to
> explicitly set a value for SPARKER_WORKER_CORES (based on what the
> documentation states).  What would I want that value to be based on my
> configuration below?  Or, would I leave that alone?
>
>   --
>  *From:* Daniel Siegmann 
> *To:* user@spark.apache.org; Darin McBeath 
> *Sent:* Wednesday, July 30, 2014 5:58 PM
> *Subject:* Re: Number of partitions and Number of concurrent tasks
>
> This is correct behavior. Each "core" can execute exactly one task at a
> time, with each task corresponding to a partition. If your cluster only has
> 24 cores, you can only run at most 24 tasks at once.
>
> You could run multiple workers per node to get more executors. That would
> give you more cores in the cluster. But however many cores you have, each
> core will run only one task at a time.
>
>
> On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath 
> wrote:
>
>  I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.
>
> I have an RDD which I've repartitioned so it has 100 partitions
> (hoping to increase the parallelism).
>
> When I do a transformation (such as filter) on this RDD, I can't  seem to
> get more than 24 tasks (my total number of cores across the 3 nodes) going
> at one point in time.  By tasks, I mean the number of tasks that appear
> under the Application UI.  I tried explicitly sett

Re: Number of partitions and Number of concurrent tasks

2014-07-31 Thread Daniel Siegmann
I haven't configured this myself. I'd start with setting SPARK_WORKER_CORES
to a higher value, since that's a bit simpler than adding more workers.
This defaults to "all available cores" according to the documentation, so
I'm not sure if you can actually set it higher. If not, you can get around
this by adding more worker instances; I believe simply setting
SPARK_WORKER_INSTANCES to 2 would be sufficient.

I don't think you *have* to set the cores if you have more workers - it
will default to 8 cores per worker (in your case). But maybe 16 cores per
node will be too many. You'll have to test. Keep in mind that more workers
means more memory and such too, so you may need to tweak some other
settings downward in this case.

On a side note: I've read some people found performance was better when
they had more workers with less memory each, instead of a single worker
with tons of memory, because it cut down on garbage collection time. But I
can't speak to that myself.

In any case, if you increase the number of cores available in your cluster
(whether per worker, or adding more workers per node, or of course adding
more nodes) you should see more tasks running concurrently. Whether this
will actually be *faster* probably depends mainly on whether the CPUs in
your nodes were really being fully utilized with the current number of
cores.


On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath  wrote:

> Thanks.
>
> So to make sure I understand.  Since I'm using a 'stand-alone' cluster, I
> would set SPARK_WORKER_INSTANCES to something like 2 (instead of the
> default value of 1).  Is that correct?  But, it also sounds like I need to
> explicitly set a value for SPARKER_WORKER_CORES (based on what the
> documentation states).  What would I want that value to be based on my
> configuration below?  Or, would I leave that alone?
>
>   --
>  *From:* Daniel Siegmann 
> *To:* user@spark.apache.org; Darin McBeath 
> *Sent:* Wednesday, July 30, 2014 5:58 PM
> *Subject:* Re: Number of partitions and Number of concurrent tasks
>
> This is correct behavior. Each "core" can execute exactly one task at a
> time, with each task corresponding to a partition. If your cluster only has
> 24 cores, you can only run at most 24 tasks at once.
>
> You could run multiple workers per node to get more executors. That would
> give you more cores in the cluster. But however many cores you have, each
> core will run only one task at a time.
>
>
> On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath 
> wrote:
>
>  I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.
>
> I have an RDD which I've repartitioned so it has 100 partitions
> (hoping to increase the parallelism).
>
> When I do a transformation (such as filter) on this RDD, I can't  seem to
> get more than 24 tasks (my total number of cores across the 3 nodes) going
> at one point in time.  By tasks, I mean the number of tasks that appear
> under the Application UI.  I tried explicitly setting the
> spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently
> running) and verified this in the Application UI for the running
> application but this had no effect.  Perhaps, this is ignored for a
> 'filter' and the default is the total number of cores available.
>
> I'm fairly new with Spark so maybe I'm just missing or misunderstanding
> something fundamental.  Any help would be appreciated.
>
> Thanks.
>
> Darin.
>
>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io
>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Number of partitions and Number of concurrent tasks

2014-07-30 Thread Daniel Siegmann
This is correct behavior. Each "core" can execute exactly one task at a
time, with each task corresponding to a partition. If your cluster only has
24 cores, you can only run at most 24 tasks at once.

You could run multiple workers per node to get more executors. That would
give you more cores in the cluster. But however many cores you have, each
core will run only one task at a time.


On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath  wrote:

> I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1.
>
> I have an RDD which I've repartitioned so it has 100 partitions
> (hoping to increase the parallelism).
>
> When I do a transformation (such as filter) on this RDD, I can't  seem to
> get more than 24 tasks (my total number of cores across the 3 nodes) going
> at one point in time.  By tasks, I mean the number of tasks that appear
> under the Application UI.  I tried explicitly setting the
> spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently
> running) and verified this in the Application UI for the running
> application but this had no effect.  Perhaps, this is ignored for a
> 'filter' and the default is the total number of cores available.
>
> I'm fairly new with Spark so maybe I'm just missing or misunderstanding
> something fundamental.  Any help would be appreciated.
>
> Thanks.
>
> Darin.
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Unit Testing (JUnit) with Spark

2014-07-29 Thread Daniel Siegmann
Sonal's suggestion of looking at the JavaAPISuite is a good idea. Just a
few things to note.

Pay special attention to what's being done in the setUp and tearDown
methods, because that's where the magic is happening. To unit test against
Spark, pretty much all you need to do is create a context running against
the "local" master (or "local[*]", etc.). But note that the context is
stopped in the tearDown method - this is critical, because if you have
multiple local contexts running you can get some very odd errors.

If your tests run in parallel, you'll need to share a context instead (via
some sort of shared reference). It isn't strictly necessary (I think) to
stop the context in this case, since it will be killed when the JVM shuts
down.

In your tests you can create input data using SparkContext.parallelize and
retrieve the output data using RDD.collect (or other actions).


On Tue, Jul 29, 2014 at 12:57 PM, Sonal Goyal  wrote:

> You can take a look at
> https://github.com/apache/spark/blob/master/core/src/test/java/org/apache/spark/JavaAPISuite.java
> and model your junits based on it.
>
> Best Regards,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
>
> On Tue, Jul 29, 2014 at 10:10 PM, Kostiantyn Kudriavtsev <
> kudryavtsev.konstan...@gmail.com> wrote:
>
>> Hi,
>>
>> try this one
>> http://simpletoad.blogspot.com/2014/07/runing-spark-unit-test-on-windows-7.html
>>
>> it’s more about fixing windows-specific issue, but code snippet gives
>> general idea
>> just run etl and check output w/ Assert(s)
>>
>> On Jul 29, 2014, at 6:29 PM, soumick86  wrote:
>>
>> > Is there any example out there for unit testing a Spark application in
>> Java?
>> > Even a trivial application like word count will be very helpful. I am
>> very
>> > new to this and I am struggling to understand how I can use JavaSpark
>> > Context for JUnit
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Testing-JUnit-with-Spark-tp10861.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: mapToPair vs flatMapToPair vs flatMap function usage.

2014-07-25 Thread Daniel Siegmann
The map and flatMap methods have a similar purpose, but map is 1 to 1,
while flatMap is 1 to 0-N (outputting 0 is similar to a filter, except of
course it could be outputting a different type).


On Thu, Jul 24, 2014 at 6:41 PM, abhiguruvayya 
wrote:

> Can any one help me understand the key difference between mapToPair vs
> flatMapToPair vs flatMap functions and also when to apply these functions
> in
> particular.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mapToPair-vs-flatMapToPair-vs-flatMap-function-usage-tp10617.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Using case classes as keys does not seem to work.

2014-07-22 Thread Daniel Siegmann
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your example is actually grouping on just the name. Try this:

sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
(P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
(P(charly),ArrayBuffer(1)))


On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas  wrote:

> Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
> and derivates like 'distinct'.
>
> groupByKey() seems to work
>
> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>
>
>
> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas 
> wrote:
>
>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>
>> A minimal example:
>>
>> case class P(name:String)
>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>> (P(bob),1), (P(abe),1), (P(charly),1))
>>
>> In contrast to the expected behavior, that should be equivalent to:
>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>
>> Any ideas why this doesn't work?
>>
>> -kr, Gerard.
>>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Memory & compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
Depending on how your C++ program is designed, maybe you can feed the data
from multiple partitions into the same process? Getting the results back
might be tricky. But that may be the only way to guarantee you're only
using one invocation per node.


On Mon, Jul 14, 2014 at 5:12 PM, Matei Zaharia 
wrote:

> I think coalesce with shuffle=true will force it to have one task per
> node. Without that, it might be that due to data locality it decides to
> launch multiple ones on the same node even though the total # of tasks is
> equal to the # of nodes.
>
> If this is the *only* thing you run on the cluster, you could also
> configure the Workers to only report one core by manually launching the
> spark.deploy.worker.Worker process with that flag (see
> http://spark.apache.org/docs/latest/spark-standalone.html).
>
> Matei
>
> On Jul 14, 2014, at 1:59 PM, Daniel Siegmann 
> wrote:
>
> I don't have a solution for you (sorry), but do note that
> rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
> set shuffle=true then it should repartition and redistribute the data.
> But it uses the hash partitioner according to the ScalaDoc - I don't know
> of any way to supply a custom partitioner.
>
>
> On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya  wrote:
>
>> I'm trying to run a job that includes an invocation of a memory &
>> compute-intensive multithreaded C++ program, and so I'd like to run one
>> task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
>> one task per core, and so runs out of memory on the node. Is there any way
>> to give the scheduler a hint that the task uses lots of memory and cores so
>> it spreads it out more evenly?
>>
>> Thanks,
>>
>> Ravi Pandya
>> Microsoft Research
>>
>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegm...@velos.io W: www.velos.io
>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Memory & compute-intensive tasks

2014-07-14 Thread Daniel Siegmann
I don't have a solution for you (sorry), but do note that
rdd.coalesce(numNodes) keeps data on the same nodes where it was. If you
set shuffle=true then it should repartition and redistribute the data. But
it uses the hash partitioner according to the ScalaDoc - I don't know of
any way to supply a custom partitioner.


On Mon, Jul 14, 2014 at 4:09 PM, Ravi Pandya  wrote:

> I'm trying to run a job that includes an invocation of a memory &
> compute-intensive multithreaded C++ program, and so I'd like to run one
> task per physical node. Using rdd.coalesce(# nodes) seems to just allocate
> one task per core, and so runs out of memory on the node. Is there any way
> to give the scheduler a hint that the task uses lots of memory and cores so
> it spreads it out more evenly?
>
> Thanks,
>
> Ravi Pandya
> Microsoft Research
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Can we get a spark context inside a mapper

2014-07-14 Thread Daniel Siegmann
Rahul, I'm not sure what you mean by your question being "very
unprofessional". You can feel free to answer such questions here. You may
or may not receive an answer, and you shouldn't necessarily expect to have
your question answered within five hours.

I've never tried to do anything like your case. I imagine the easiest thing
would be to read and process each file individually, since you are
intending to produce a separate result for each. You could also look at
RDD.wholeTextFiles - maybe that will be of some use if your files are small
- but I don't know of any corresponding save method which would generate
files with different names from within a single RDD.


On Mon, Jul 14, 2014 at 2:30 PM, Rahul Bhojwani  wrote:

> I understand that the question is very unprofessional, but I am a newbie.
> If you could share some link where I can ask such questions, if not here.
>
> But please answer.
>
>
> On Mon, Jul 14, 2014 at 6:52 PM, Rahul Bhojwani <
> rahulbhojwani2...@gmail.com> wrote:
>
>> Hey, My question is for this situation:
>> Suppose we have 10 files each containing list of features in each row.
>>
>> Task is that for each file cluster the features in that file and write
>> the corresponding cluster along with it in a new file.  So we have to
>> generate 10 more files by applying clustering in each file
>> individually.
>>
>> So can I do it this way, that get rdd of list of files and apply map.
>> Inside the mapper function which will be handling each file, get another
>> spark context and use Mllib kmeans to get the clustered output file.
>>
>> Please suggest the appropriate method to tackle this problem.
>>
>> Thanks,
>> Rahul Kumar Bhojwani
>> 3rd year, B.Tech
>> Computer Science Engineering
>> National Institute Of Technology, Karnataka
>> 9945197359
>>
>
>
>
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: All of the tasks have been completed but the Stage is still shown as "Active"?

2014-07-10 Thread Daniel Siegmann
One thing to keep in mind is that the progress bar doesn't take into
account tasks which are rerun. If you see 4/4 but the stage is still
active, click the stage name and look at the task list. That will show you
if any are actually running. When rerun tasks complete, it can result in
the number of successful tasks being greater than the number of total
tasks; e.g. the progress bar might display 5/4.

Another bug is that a stage might complete and be moved to the completed
list, but if tasks are then rerun it will appear in both the completed and
active stages list. If it completes again, you will see that stage *twice*
in the completed stages list.

Of course, you should only be seeing this behavior if things are going
wrong; a node failing, for example.


On Thu, Jul 10, 2014 at 4:21 AM, Haopu Wang  wrote:

> I'm running an App for hours in a standalone cluster. From the data
> injector and "Streaming" tab of web ui, it's running well.
>
> However, I see quite a lot of Active stages in web ui even some of them
> have all of their tasks completed.
>
> I attach a screenshot for your reference.
>
> Do you ever see this kind of behavior?
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


  1   2   >