Re: groupByKey() and keys with many values

2015-09-08 Thread Sean Owen
I think groupByKey is intended for cases where you do want the values
in memory; for one-pass use cases, it's more efficient to use
reduceByKey, or aggregateByKey if lower-level operations are needed.

For your case, you probably want to do you reduceByKey, then perform
the expensive per-key lookups once per key. You also probably want to
do this in foreachPartition, not foreach, in order to pay DB
connection costs just once per partition.

On Tue, Sep 8, 2015 at 7:20 AM, kaklakariada  wrote:
> Hi Antonio!
>
> Thank you very much for your answer!
> You are right in that in my case the computation could be replaced by a
> reduceByKey. The thing is that my computation also involves database
> queries:
>
> 1. Fetch key-specific data from database into memory. This is expensive and
> I only want to do this once for a key.
> 2. Process each value using this data and update the common data
> 3. Store modified data to database. Here it is important to write all data
> for a key in one go.
>
> Is there a pattern how to implement something like this with reduceByKey?
>
> Out of curiosity: I understand why you want to discourage people from using
> groupByKey. But is there a technical reason why the Iterable is implemented
> the way it is?
>
> Kind regards,
> Christoph.
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985p13992.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



adding jars to the classpath with the relative path to spark home

2015-09-08 Thread Niranda Perera
Hi,

is it possible to add jars to the spark executor/  driver classpath with
the relative path of the jar (relative to the spark home)?
I need to set the following settings in the spark conf
- spark.driver.extraClassPath
- spark.executor.extraClassPath

the reason why I need to use the relative path is, if not, if we have a
spark cluster, all the jars needs to be kept in the same folder path.

I know we can pass the jars using the --jars options. but I'd rather prefer
this option.

cheers
-- 
Niranda
@n1r44 
https://pythagoreanscript.wordpress.com/


Re: groupByKey() and keys with many values

2015-09-08 Thread kaklakariada
Hi Antonio!

Thank you very much for your answer!
You are right in that in my case the computation could be replaced by a
reduceByKey. The thing is that my computation also involves database
queries:

1. Fetch key-specific data from database into memory. This is expensive and
I only want to do this once for a key.
2. Process each value using this data and update the common data
3. Store modified data to database. Here it is important to write all data
for a key in one go.

Is there a pattern how to implement something like this with reduceByKey?

Out of curiosity: I understand why you want to discourage people from using
groupByKey. But is there a technical reason why the Iterable is implemented
the way it is?

Kind regards,
Christoph.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985p13992.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Detecting configuration problems

2015-09-08 Thread Akhil Das
I found an old JIRA referring the same.
https://issues.apache.org/jira/browse/SPARK-5421

Thanks
Best Regards

On Sun, Sep 6, 2015 at 8:53 PM, Madhu  wrote:

> I'm not sure if this has been discussed already, if so, please point me to
> the thread and/or related JIRA.
>
> I have been running with about 1TB volume on a 20 node D2 cluster (255
> GiB/node).
> I have uniformly distributed data, so skew is not a problem.
>
> I found that default settings (or wrong setting) for driver and executor
> memory caused out of memory exceptions during shuffle (subtractByKey to be
> exact). This was not easy to track down, for me at least.
>
> Once I bumped up driver to 12G and executor to 10G with 300 executors and
> 3000 partitions, shuffle worked quite well (12 mins for subtractByKey). I'm
> sure there are more improvement to made, but it's a lot better than heap
> space exceptions!
>
> From my reading, the shuffle OOM problem is in ExternalAppendOnlyMap or
> similar disk backed collection.
> I have some familiarity with that code based on previous work with external
> sorting.
>
> Is it possible to detect misconfiguration that leads to these OOMs and
> produce a more meaningful error messages? I think that would really help
> users who might not understand all the inner workings and configuration of
> Spark (myself included). As it is, heap space issues are a challenge and
> does not present Spark in a positive light.
>
> I can help with that effort if someone is willing to point me to the
> precise
> location of memory pressure during shuffle.
>
> Thanks!
>
>
>
> -
> --
> Madhu
> https://www.linkedin.com/in/msiddalingaiah
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Detecting-configuration-problems-tp13980.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


Re: Code generation for GPU

2015-09-08 Thread Steve Loughran

On 7 Sep 2015, at 20:44, lonikar > 
wrote:


2. If the vectorization is difficult or a major effort, I am not sure how I
am going to implement even a glimpse of changes I would like to. I think I
will have to satisfied with only a partial effort. Batching rows defeats the
purpose as I have found that it consumes a considerable amount of CPU cycles
and producing one row at a time also takes away the performance benefit.
Whats really required is to access a large partition and produce the result
partition in one shot.


why not look at the dataframes APIs and the back-end implementations of things 
which support it?  The data sources which are columnized from the outset (ORC, 
parquet) are the ones where vector operations work well : you can read at of 
columns, perform a parallel operation, then repeat.

If you can hook up to a column structure you may get that speedup.


I think I will have to severely limit the scope of my talk in that case. Or
re-orient it to propose the changes instead of presenting the results of
execution on GPU. Please suggest since you seem to have selected the talk.

It is always essential to have the core of your talk ready before you propose 
the talk -its something reviewers (nothing to do with me here) mostly expect. 
Otherwise you are left in a panic three days before trying to do bash together 
some slides you will have to present to an audience that may include people 
that know the code better than you. I've been there -and fear I will be there 
again in 3 weeks time.

Some general suggestions

  1.  assume the audience knows spark, but not how to code for GPUs: intro that 
on a slide or two
  2.  cover the bandwidth problem: how much computation is needed before 
working with the GPU is justified
  3.  Look at the body of work of Hadoop MapReduce & GPUs and the limitations 
(IO bandwidth, intermediate stage B/W) as well as benefits (perf on CPU 
workloads, power budget)
  4.  Cover how that's changing: SDDs, in-memory filesystems, whether 
infiniband would help.
  5.  Try to demo something. It's always nice to show something working at a 
talk, even if its just your laptop




Re: Deserializing JSON into Scala objects in Java code

2015-09-08 Thread Marcelo Vanzin
Hi Kevin,

How did you try to use the Scala module? Spark has this code when
setting up the ObjectMapper used to generate the output:

  mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)

As for supporting direct serialization to Java objects, I don't think
that was the goal of the API. The Scala API classes are public mostly
so that API compatibility checks are performed against them. If you
don't mind the duplication, you could write your own Java POJOs that
mirror the Scala API, and use them to deserialize the JSON.


On Tue, Sep 8, 2015 at 12:46 PM, Kevin Chen  wrote:
> Hello Spark Devs,
>
>  I am trying to use the new Spark API json endpoints at /api/v1/[path]
> (added in SPARK-3454).
>
>  In order to minimize maintenance on our end, I would like to use
> Retrofit/Jackson to parse the json directly into the Scala classes in
> org/apache/spark/status/api/v1/api.scala (ApplicationInfo,
> ApplicationAttemptInfo, etc…). However, Jackson does not seem to know how to
> handle Scala Seqs, and will throw an error when trying to parse the
> attempts: Seq[ApplicationAttemptInfo] field of ApplicationInfo. Our codebase
> is in Java.
>
>  My questions are:
>
> Do you have any recommendations on how to easily deserialize Scala objects
> from json? For example, do you have any current usage examples of SPARK-3454
> with Java?
> Alternatively, are you committed to the json formats of /api/v1/path? I
> would guess so, because of the ‘v1’, but wanted to confirm. If so, I could
> deserialize the json into instances of my own Java classes instead, without
> worrying about changing the class structure later due to changes in the
> Spark API.
>
> Some further information:
>
> The error I am getting with Jackson when trying to deserialize the json into
> ApplicationInfo is Caused by:
> com.fasterxml.jackson.databind.JsonMappingException: Can not construct
> instance of scala.collection.Seq, problem: abstract types either need to be
> mapped to concrete types, have custom deserializer, or be instantiated with
> additional type information
> I tried using Jackson’s DefaultScalaModule, which seems to have support for
> Scala Seqs, but got no luck.
> Deserialization works if the Scala class does not have any Seq fields, and
> works if the fields are Java Lists instead of Seqs.
>
> Thanks very much for your help!
> Kevin Chen
>



-- 
Marcelo

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Fast Iteration while developing

2015-09-08 Thread Michael Armbrust
+1 to reynolds suggestion.  This is probably the fastest way to iterate.

Another option for more ad-hoc debugging is `sbt/sbt sparkShell` which is
similar to bin/spark-shell but doesn't require you to rebuild the assembly
jar.

On Mon, Sep 7, 2015 at 9:03 PM, Reynold Xin  wrote:

> I usually write a test case for what I want to test, and then run
>
> sbt/sbt "~module/test:test-only *MyTestSuite"
>
>
>
> On Mon, Sep 7, 2015 at 6:02 PM, Justin Uang  wrote:
>
>> Hi,
>>
>> What is the normal workflow for the core devs?
>>
>> - Do we need to build the assembly jar to be able to run it from the
>> spark repo?
>> - Do you use sbt or maven to do the build?
>> - Is zinc only usuable for maven?
>>
>> I'm asking because the current process I have right now is to do sbt
>> build, which means I'm stuck with about a 3-5 minute iteration cycle.
>>
>> Thanks!
>>
>> Justin
>>
>
>


Deserializing JSON into Scala objects in Java code

2015-09-08 Thread Kevin Chen
Hello Spark Devs,

 I am trying to use the new Spark API json endpoints at /api/v1/[path]
(added in SPARK-3454).

 In order to minimize maintenance on our end, I would like to use
Retrofit/Jackson to parse the json directly into the Scala classes in
org/apache/spark/status/api/v1/api.scala (ApplicationInfo,
ApplicationAttemptInfo, etc…). However, Jackson does not seem to know how to
handle Scala Seqs, and will throw an error when trying to parse the
attempts: Seq[ApplicationAttemptInfo] field of ApplicationInfo. Our codebase
is in Java.

 My questions are:
1. Do you have any recommendations on how to easily deserialize Scala
objects from json? For example, do you have any current usage examples of
SPARK-3454 with Java?
2. Alternatively, are you committed to the json formats of /api/v1/path? I
would guess so, because of the ‘v1’, but wanted to confirm. If so, I could
deserialize the json into instances of my own Java classes instead, without
worrying about changing the class structure later due to changes in the
Spark API.
Some further information:
* The error I am getting with Jackson when trying to deserialize the json
into ApplicationInfo is Caused by:
com.fasterxml.jackson.databind.JsonMappingException: Can not construct
instance of scala.collection.Seq, problem: abstract types either need to be
mapped to concrete types, have custom deserializer, or be instantiated with
additional type information
* I tried using Jackson’s DefaultScalaModule, which seems to have support
for Scala Seqs, but got no luck.
* Deserialization works if the Scala class does not have any Seq fields, and
works if the fields are Java Lists instead of Seqs.
Thanks very much for your help!
Kevin Chen





smime.p7s
Description: S/MIME cryptographic signature


Re: Pyspark DataFrame TypeError

2015-09-08 Thread Davies Liu
I tried with Python 2.7/3.4 and Spark 1.4.1/1.5-RC3, they all work as expected:

```
>>> from pyspark.mllib.linalg import Vectors
>>> df = sqlContext.createDataFrame([(1.0, Vectors.dense([1.0])), (0.0, 
>>> Vectors.sparse(1, [], []))], ["label", "featuers"])
>>> df.show()
+-+-+
|label| featuers|
+-+-+
|  1.0|[1.0]|
|  0.0|(1,[],[])|
+-+-+

>>> df.columns
['label', 'featuers']
```

On Tue, Sep 8, 2015 at 1:45 AM, Prabeesh K.  wrote:
> I am trying to run the code RandomForestClassifier example in the PySpark
> 1.4.1 documentation,
> https://spark.apache.org/docs/1.4.1/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier.
>
> Below is screen shot of ipython notebook
>
>
>
> But for df.columns. It shows following error.
>
>
> TypeError Traceback (most recent call last)
>  in ()
> > 1 df.columns
>
> /home/datasci/src/spark/python/pyspark/sql/dataframe.pyc in columns(self)
> 484 ['age', 'name']
> 485 """
> --> 486 return [f.name for f in self.schema.fields]
> 487
> 488 @ignore_unicode_prefix
>
> /home/datasci/src/spark/python/pyspark/sql/dataframe.pyc in schema(self)
> 194 """
> 195 if self._schema is None:
> --> 196 self._schema =
> _parse_datatype_json_string(self._jdf.schema().json())
> 197 return self._schema
> 198
>
> /home/datasci/src/spark/python/pyspark/sql/types.pyc in
> _parse_datatype_json_string(json_string)
> 519 >>> check_datatype(structtype_with_udt)
> 520 """
> --> 521 return _parse_datatype_json_value(json.loads(json_string))
> 522
> 523
>
> /home/datasci/src/spark/python/pyspark/sql/types.pyc in
> _parse_datatype_json_value(json_value)
> 539 tpe = json_value["type"]
> 540 if tpe in _all_complex_types:
> --> 541 return _all_complex_types[tpe].fromJson(json_value)
> 542 elif tpe == 'udt':
> 543 return UserDefinedType.fromJson(json_value)
>
> /home/datasci/src/spark/python/pyspark/sql/types.pyc in fromJson(cls, json)
> 386 @classmethod
> 387 def fromJson(cls, json):
> --> 388 return StructType([StructField.fromJson(f) for f in
> json["fields"]])
> 389
> 390
>
> /home/datasci/src/spark/python/pyspark/sql/types.pyc in fromJson(cls, json)
> 347 def fromJson(cls, json):
> 348 return StructField(json["name"],
> --> 349_parse_datatype_json_value(json["type"]),
> 350json["nullable"],
> 351json["metadata"])
>
> /home/datasci/src/spark/python/pyspark/sql/types.pyc in
> _parse_datatype_json_value(json_value)
> 541 return _all_complex_types[tpe].fromJson(json_value)
> 542 elif tpe == 'udt':
> --> 543 return UserDefinedType.fromJson(json_value)
> 544 else:
> 545 raise ValueError("not supported type: %s" % tpe)
>
> /home/datasci/src/spark/python/pyspark/sql/types.pyc in fromJson(cls, json)
> 453 pyModule = pyUDT[:split]
> 454 pyClass = pyUDT[split+1:]
> --> 455 m = __import__(pyModule, globals(), locals(), [pyClass])
> 456 UDT = getattr(m, pyClass)
> 457 return UDT()
>
> TypeError: Item in ``from list'' not a string
>
>
>
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: groupByKey() and keys with many values

2015-09-08 Thread Reynold Xin
On Tue, Sep 8, 2015 at 6:51 AM, Antonio Piccolboni 
wrote:

> As far as the DB writes,  remember spark can retry a computation, so your
> writes have to be idempotent (see this thread
> , in
> which Reynold is a bit optimistic about failures than I am comfortable
> with, but who am I to question Reynold?)
>

I'm wrong all the time so please do question me :)

One thing is that apps should be using something like an output committer
to enforce idempotency. Maybe that's some API we can provide in Spark
itself to make it easier to write applications.


Re: Detecting configuration problems

2015-09-08 Thread Madhu
Thanks Akhil!

I suspect the root cause of the shuffle OOM I was seeing (and probably many
that users might see) is due to individual partitions on the reduce side not
fitting in memory. As a guideline, I was thinking of something like "be sure
that your largest partitions occupy no more then 1% of executor memory" or
something to that effect. I can add that documentation to the tuning page if
someone can suggest the the best wording and numbers. I can also add a
simple Spark shell example to estimate largest partition size to determine
executor memory and number of partitions.

One more question: I'm trying to get my head around the shuffle code. I see
ShuffleManager, but that seems to be on the reduce side. Where is the code
driving the map side writes and reduce reads? I think it is possible to add
up reduce side volume for a key (they are byte reads at some point) and
raise an alarm if it's getting too high. Even a warning on the console would
be better than a catastrophic OOM.



-
--
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Detecting-configuration-problems-tp13980p13998.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: groupByKey() and keys with many values

2015-09-08 Thread Antonio Piccolboni
You may also consider selecting distinct keys and fetching from database
first, then join on key with values. This in case Sean's approach is not
viable -- in case you need to have the DB data before the first reduce
call. By not revealing your problem, you are forcing us to make guesses,
which are less useful. Imagine you want to compute a binning of the values
on a per key basis. The bin definitions are in the database. Then the
reduce would be updating counts per bin.  You could let the reduce
initialize the bin counts from DB when empty. This will result in multiple
database accesses and connections per key, and the higher the degree of
parallelism, the bigger the cost (see this
 elementary
example), which is something you should avoid if you want to write code
with some durability to it. If you use the join approach, you can select
the keys, unique them and perform data base access to obtain bin defs. Now
join the data file with the bin file on key. Then pass this through a
reduceByKey to update the bin counts. Different application, you want to
compute max min values per key and want to compare with previously recored
max min, then store the overall max min. Then you don't need the data based
values during the reduce. You just fetch them in the foreachPartition,
before each write.

As far as the DB writes,  remember spark can retry a computation, so your
writes have to be idempotent (see this thread
, in which
Reynold is a bit optimistic about failures than I am comfortable with, but
who am I to question Reynold?)






On Tue, Sep 8, 2015 at 12:53 AM Sean Owen  wrote:

> I think groupByKey is intended for cases where you do want the values
> in memory; for one-pass use cases, it's more efficient to use
> reduceByKey, or aggregateByKey if lower-level operations are needed.
>
> For your case, you probably want to do you reduceByKey, then perform
> the expensive per-key lookups once per key. You also probably want to
> do this in foreachPartition, not foreach, in order to pay DB
> connection costs just once per partition.
>
> On Tue, Sep 8, 2015 at 7:20 AM, kaklakariada 
> wrote:
> > Hi Antonio!
> >
> > Thank you very much for your answer!
> > You are right in that in my case the computation could be replaced by a
> > reduceByKey. The thing is that my computation also involves database
> > queries:
> >
> > 1. Fetch key-specific data from database into memory. This is expensive
> and
> > I only want to do this once for a key.
> > 2. Process each value using this data and update the common data
> > 3. Store modified data to database. Here it is important to write all
> data
> > for a key in one go.
> >
> > Is there a pattern how to implement something like this with reduceByKey?
> >
> > Out of curiosity: I understand why you want to discourage people from
> using
> > groupByKey. But is there a technical reason why the Iterable is
> implemented
> > the way it is?
> >
> > Kind regards,
> > Christoph.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985p13992.html
> > Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> > For additional commands, e-mail: dev-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>