Re: GroupBy and Spark Performance issue

2017-01-17 Thread Andy Dang
Repartition wouldn't save you from skewed data unfortunately. The way Spark
works now is that it pulls data of the same key to one single partition,
and Spark, AFAIK, retains the mapping from key to data in memory.

You can use aggregateBykey() or combineByKey() or reduceByKey() to avoid
this problem because these functions can be evaluated using map-side
aggregation:
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html


---
Regards,
Andy

On Tue, Jan 17, 2017 at 5:39 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am trying to group by data in spark and find out maximum value for group
> of data. I have to use group by as I need to transpose based on the values.
>
> I tried repartition data by increasing number from 1 to 1.Job gets run
> till the below stage and it takes long time to move ahead. I was never
> successful, job gets killed after somtime with GC overhead limit issues.
>
>
> [image: Inline image 1]
>
> Increased Memory limits too. Not sure what is going wrong, can anyone
> guide me through right approach.
>
> Thanks,
> Asmath
>


Re: groupByKey vs mapPartitions for efficient grouping within a Partition

2017-01-16 Thread Andy Dang
groupByKey() is a wide dependency and will cause a full shuffle. It's
advised against using this transformation unless you keys are balanced
(well-distributed) and you need a full shuffle.

Otherwise, what you want is aggregateByKey() or reduceByKey() (depending on
the output). These actions are backed by comineByKey(), which can perform
map-side aggregation.

---
Regards,
Andy

On Mon, Jan 16, 2017 at 2:21 PM, Patrick  wrote:

> Hi,
>
> Does groupByKey has intelligence associated with it, such that if all the
> keys resides in the same partition, it should not do the shuffle?
>
> Or user should write mapPartitions( scala groupBy code).
>
> Which would be more efficient and what are the memory considerations?
>
>
> Thanks
>
>
>
>


Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi Takeshi,

Thanks for the answer. My UDAF aggregates data into an array of rows.

Apparently this makes it ineligible to using Hash-based aggregate based on
the logic at:
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java#L74
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L108

The list of support data type is VERY limited unfortunately.

It doesn't make sense to me that data type must be mutable for the UDAF to
use hash-based aggregate, but I could be missing something here :). I could
achieve hash-based aggregate by turning this query to RDD mode, but that is
counter intuitive IMO.

---
Regards,
Andy

On Mon, Jan 9, 2017 at 2:05 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> Spark always uses hash-based aggregates if the types of aggregated data
> are supported there;
> otherwise, spark fails to use hash-based ones, then it uses sort-based
> ones.
> See: https://github.com/apache/spark/blob/master/sql/
> core/src/main/scala/org/apache/spark/sql/execution/
> aggregate/AggUtils.scala#L38
>
> So, I'm not sure about your query though, it seems the types of aggregated
> data in your query
> are not supported for hash-based aggregates.
>
> // maropu
>
>
>
> On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <nam...@gmail.com> wrote:
>
>> Hi all,
>>
>> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
>> which is very inefficient for certain aggration:
>>
>> The code is very simple:
>> - I have a UDAF
>> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>>
>> The physical plan I got was:
>> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)],
>> output=[count#71L])
>>   +- *Project
>>  +- Generate explode(internal_col#31), false, false,
>> [internal_col#42]
>> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
>> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
>> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>>+- *Sort [key#0 ASC], false, 0
>>   +- Exchange hashpartitioning(key#0, 200)
>>  +- SortAggregate(key=[key#0],
>> functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
>> nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
>> output=[key#0,internal_col#37])
>> +- *Sort [key#0 ASC], false, 0
>>+- Scan ExistingRDD[key#0,nested#1,nes
>> tedArray#2,nestedObjectArray#3,value#4L]
>>
>> How can I make Spark to use HashAggregate (like the count(*) expression)
>> instead of SortAggregate with my UDAF?
>>
>> Is it intentional? Is there an issue tracking this?
>>
>> ---
>> Regards,
>> Andy
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Andy Dang
Hi all,

It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
which is very inefficient for certain aggration:

The code is very simple:
- I have a UDAF
- What I want to do is: dataset.groupBy(cols).agg(udaf).count()

The physical plan I got was:
*HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)],
output=[count#71L])
  +- *Project
 +- Generate explode(internal_col#31), false, false,
[internal_col#42]
+- SortAggregate(key=[key#0],
functions=[aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[internal_col#31])
   +- *Sort [key#0 ASC], false, 0
  +- Exchange hashpartitioning(key#0, 200)
 +- SortAggregate(key=[key#0],
functions=[partial_aggregatefunction(key#0, nested#1, nestedArray#2,
nestedObjectArray#3, value#4L, com.[...]uDf@108b121f, 0, 0)],
output=[key#0,internal_col#37])
+- *Sort [key#0 ASC], false, 0
   +- Scan
ExistingRDD[key#0,nested#1,nestedArray#2,nestedObjectArray#3,value#4L]

How can I make Spark to use HashAggregate (like the count(*) expression)
instead of SortAggregate with my UDAF?

Is it intentional? Is there an issue tracking this?

---
Regards,
Andy


Converting an InternalRow to a Row

2017-01-04 Thread Andy Dang
Hi all,
(cc-ing dev since I've hit some developer API corner)

What's the best way to convert an InternalRow to a Row if I've got an
InternalRow and the corresponding Schema.

Code snippet:
@Test
public void foo() throws Exception {
Row row = RowFactory.create(1);
StructType struct = new StructType().add("id",
DataTypes.IntegerType);
ExpressionEncoder enconder = RowEncoder.apply(struct);
InternalRow internalRow = enconder.toRow(row);
System.out.println("Internal row size: " + internalRow.numFields());
Row roundTrip = enconder.fromRow(internalRow);
System.out.println("Round trip: " + roundTrip.size());
}

The code fails at the line encoder.fromRow() with the exception:
> Caused by: java.lang.UnsupportedOperationException: Cannot evaluate
expression: getcolumnbyordinal(0, IntegerType)

---
Regards,
Andy


Re: top-k function for Window

2017-01-03 Thread Andy Dang
Hi Austin,

It's trivial to implement top-k in the RDD world - however I would like to
stay in the Dataset API world instead of flip-flopping between the two APIs
(consistency, wholestage codegen etc).

The twitter library appears to support only RDD, and the solution you gave
me is very similar to what I did - it doesn't work very well with skewed
dataset :) (it has to perform the sort to work out the row number).

I've been toying with the UDAF idea, but the more I write the code the more
I see myself digging deeper into the developer API land  - not very ideal
to be honest. Also, UDAF doesn't have any concept of sorting, so it gets
messy really fast.

---
Regards,
Andy

On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L <ah6...@att.com> wrote:

> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang <nam...@gmail.com>
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user <user@spark.apache.org>
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>


top-k function for Window

2017-01-03 Thread Andy Dang
Hi all,

What's the best way to do top-k with Windowing in Dataset world?

I have a snippet of code that filters the data to the top-k, but with
skewed keys:

val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
val rank = row_number().over(windowSpec)

input.withColumn("rank", rank).filter("rank <= 10").drop("rank")

The problem with this code is that Spark doesn't know that it can sort the
data locally, get the local rank first. What it ends up doing is performing
a sort by key using the skewed keys, and this blew up the cluster since the
keys are heavily skewed.

In the RDD world we can do something like:
rdd.mapPartitioins(iterator -> topK(iterator))
but I can't really think of an obvious to do this in the Dataset API,
especially with Window function. I guess some UserAggregateFunction would
do, but I wonder if there's obvious way that I missed.

---
Regards,
Andy


Re: Best Practice for Spark Job Jar Generation

2016-12-23 Thread Andy Dang
We remodel Spark dependencies and ours together and chuck them under the
/jars path. There are other ways to do it but we want the classpath to be
strictly as close to development as possible.

---
Regards,
Andy

On Fri, Dec 23, 2016 at 6:00 PM, Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Andy, Thanks for reply.
>
> If we download all the dependencies at separate location  and link with
> spark job jar on spark cluster, is it best way to execute spark job ?
>
> Thanks.
>
> On Fri, Dec 23, 2016 at 8:34 PM, Andy Dang <nam...@gmail.com> wrote:
>
>> I used to use uber jar in Spark 1.x because of classpath issues (we
>> couldn't re-model our dependencies based on our code, and thus cluster's
>> run dependencies could be very different from running Spark directly in the
>> IDE. We had to use userClasspathFirst "hack" to work around this.
>>
>> With Spark 2, it's easier to replace dependencies (say, Guava) than
>> before. We moved away from deploying superjar and just pass the libraries
>> as part of Spark jars (still can't use Guava v19 or later because Spark
>> uses a deprecated method that's not available, but that's not a big issue
>> for us).
>>
>> ---
>> Regards,
>> Andy
>>
>> On Fri, Dec 23, 2016 at 6:44 AM, Chetan Khatri <
>> chetan.opensou...@gmail.com> wrote:
>>
>>> Hello Spark Community,
>>>
>>> For Spark Job Creation I use SBT Assembly to build Uber("Super") Jar and
>>> then submit to spark-submit.
>>>
>>> Example,
>>>
>>> bin/spark-submit --class hbase.spark.chetan.com.SparkHbaseJob
>>> /home/chetan/hbase-spark/SparkMSAPoc-assembly-1.0.jar
>>>
>>> But other folks has debate with for Uber Less Jar, Guys can you please
>>> explain me best practice industry standard for the same.
>>>
>>> Thanks,
>>>
>>> Chetan Khatri.
>>>
>>
>>
>


Re: Best Practice for Spark Job Jar Generation

2016-12-23 Thread Andy Dang
I used to use uber jar in Spark 1.x because of classpath issues (we
couldn't re-model our dependencies based on our code, and thus cluster's
run dependencies could be very different from running Spark directly in the
IDE. We had to use userClasspathFirst "hack" to work around this.

With Spark 2, it's easier to replace dependencies (say, Guava) than before.
We moved away from deploying superjar and just pass the libraries as part
of Spark jars (still can't use Guava v19 or later because Spark uses a
deprecated method that's not available, but that's not a big issue for us).

---
Regards,
Andy

On Fri, Dec 23, 2016 at 6:44 AM, Chetan Khatri 
wrote:

> Hello Spark Community,
>
> For Spark Job Creation I use SBT Assembly to build Uber("Super") Jar and
> then submit to spark-submit.
>
> Example,
>
> bin/spark-submit --class hbase.spark.chetan.com.SparkHbaseJob
> /home/chetan/hbase-spark/SparkMSAPoc-assembly-1.0.jar
>
> But other folks has debate with for Uber Less Jar, Guys can you please
> explain me best practice industry standard for the same.
>
> Thanks,
>
> Chetan Khatri.
>


Re: Do you use spark 2.0 in work?

2016-10-31 Thread Andy Dang
This is my personal email so I can't exactly discuss work-related topics.

But yes, many teams in my company use Spark 2.0 in production environment.
What are the challenges that prevent you from adopting it (besides
migration from Spark 1.x)?

---
Regards,
Andy

On Mon, Oct 31, 2016 at 8:16 AM, Yang Cao  wrote:

> Hi guys,
>
> Just for personal interest. I wonder whether spark 2.0 a productive
> version? Is there any company use this version as its main version in daily
> work? THX
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: an OOM while persist as DISK_ONLY

2016-03-03 Thread Andy Dang
Spark shuffling algorithm is very aggressive in storing everything in RAM,
and the behavior is worse in 1.6 with the UnifiedMemoryManagement. At least
in previous versions you can limit the shuffler memory, but Spark 1.6 will
use as much memory as it can get. What I see is that Spark seems to
underestimate the amount of memory that objects take up, and thus doesn't
spill frequently enough. There's a dirty work around (legacy mode) but the
common advice is to increase your parallelism (and keep in mind that
operations such as join have implicit parallelism, so you'll want to be
explicit about it).

---
Regards,
Andy

On Mon, Feb 22, 2016 at 2:12 PM, Alex Dzhagriev  wrote:

> Hello all,
>
> I'm using spark 1.6 and trying to cache a dataset which is 1.5 TB, I have
> only ~800GB RAM  in total, so I am choosing the DISK_ONLY storage level.
> Unfortunately, I'm getting out of the overhead memory limit:
>
>
> Container killed by YARN for exceeding memory limits. 27.0 GB of 27 GB 
> physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
>
>
> I'm giving 6GB overhead memory and using 10 cores per executor.
> Apparently, that's not enough. Without persisting the data and later
> computing the dataset (twice in my case) the job works fine. Can anyone,
> please, explain what is the overhead which consumes that much memory during
> persist to the disk and how can I estimate what extra memory should I give
> to the executors in order to make it not fail?
>
> Thanks, Alex.
>


Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-24 Thread Andy Dang
If you execute the collect step (foreach in 1, possibly reduce in 2) in two
threads in the driver then both of them will be executed in parallel.
Whichever gets submitted to Spark first gets executed first - you can use a
semaphore if you need to ensure the ordering of execution, though I would
assume that the ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
wrote:

> I wanted to understand something about the internals of spark streaming
> executions.
>
> If I have a stream X, and in my program I send stream X to function A and
> function B:
>
> 1. In function A, I do a few transform/filter operations etc. on X->Y->Z
> to create stream Z. Now I do a forEach Operation on Z and print the output
> to a file.
>
> 2. Then in function B, I reduce stream X -> X2 (say min value of each
> RDD), and print the output to file
>
> Are both functions being executed for each RDD in parallel? How does it
> work?
>
> Thanks
> Nipun
>
>


Re: sc.parallelize with defaultParallelism=1

2015-09-30 Thread Andy Dang
Can't you just load the data from HBase first, and then call sc.parallelize
on your dataset?

-Andy

---
Regards,
Andy (Nam) Dang

On Wed, Sep 30, 2015 at 12:52 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> When calling sc.parallelize(data,1), is there a preference where to put
> the data? I see 2 possibilities: sending it to a worker node, or keeping it
> on the driver program.
>
>
> I would prefer to keep the data local to the driver. The use case is when
> I need just to load a bit of data from HBase, and then compute over it e.g.
> aggregate, using Spark.
>
>
> Thanks,
>
> Nicu
>


Re: Combine key-value pair in spark java

2015-09-30 Thread Andy Dang
You should be able to use a simple mapping:

rdd.map(tuple -> tuple._1() + tuple._2())

---
Regards,
Andy (Nam) Dang

On Wed, Sep 30, 2015 at 10:34 AM, Ramkumar V 
wrote:

> Hi,
>
> I have key value pair of JavaRDD (JavaPairRDD rdd) but i
> want to concatenate into one RDD String (JavaRDD result ).
>
> How can i do that ? What i have to use (map,flatmap)? can anyone please
> give me the syntax for this in java ?
>
> *Thanks*,
> 
>
>