Re: partitionBy causing OOM

2017-09-25 Thread ayan guha
Another possible option would be creating partitioned table in hive and use
dynamic partitioning while inserting. This will not require spark to do
explocit partition by

On Tue, 26 Sep 2017 at 12:39 pm, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Amit,
>
> Spark keeps the partition that it is working on in memory (and does not
> spill to disk even if it is running OOM). Also since you are getting OOM
> when using partitionBy (and not when you just use flatMap), there should be
> one (or few) dates on which your partition size is bigger than the heap.
> You can do a count on dates to check if there is skewness in your data.
>
> The way out would be increase the heap size or use columns in partitionBy
> (like date + hour) to distribute the data better.
>
> Hope this helps!
>
> Thanks
> Ankur
>
> On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩  wrote:
>
>> Hi, Amit,
>>
>> Maybe you can change this configuration spark.sql.shuffle.partitions.
>> The default is 200 change this property could change the task number when
>> you are using DataFrame API.
>>
>> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
>>
>> I'm trying to run a simple pyspark application that reads from file
>> (json), flattens it (explode) and writes back to file (json) partitioned by
>> date using DataFrameWriter.partitionBy(*cols).
>>
>> I keep getting OOMEs like:
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>> at
>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>> at
>> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
>> ...
>>
>> Explode could make the underlying RDD grow a lot, and maybe in an
>> unbalanced way sometimes,
>> adding to that partitioning by date (in daily ETLs for instance) would
>> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
>> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>>
>> If I'm not using "partitionBy" with the writer (still exploding)
>> everything works fine.
>>
>> This happens both in EMR and in local (mac) pyspark/spark shell (tried
>> both in python and scala).
>>
>> Thanks!
>>
>>
>>
> --
Best Regards,
Ayan Guha


Re: partitionBy causing OOM

2017-09-25 Thread Ankur Srivastava
Hi Amit,

Spark keeps the partition that it is working on in memory (and does not
spill to disk even if it is running OOM). Also since you are getting OOM
when using partitionBy (and not when you just use flatMap), there should be
one (or few) dates on which your partition size is bigger than the heap.
You can do a count on dates to check if there is skewness in your data.

The way out would be increase the heap size or use columns in partitionBy
(like date + hour) to distribute the data better.

Hope this helps!

Thanks
Ankur

On Mon, Sep 25, 2017 at 7:30 PM, 孫澤恩  wrote:

> Hi, Amit,
>
> Maybe you can change this configuration spark.sql.shuffle.partitions.
> The default is 200 change this property could change the task number when
> you are using DataFrame API.
>
> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
>
> I'm trying to run a simple pyspark application that reads from file
> (json), flattens it (explode) and writes back to file (json) partitioned by
> date using DataFrameWriter.partitionBy(*cols).
>
> I keep getting OOMEs like:
> java.lang.OutOfMemoryError: Java heap space
> at org.apache.spark.util.collection.unsafe.sort.
> UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
> at org.apache.spark.util.collection.unsafe.sort.
> UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
> at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(
> TaskMemoryManager.java:203)
> ...
>
> Explode could make the underlying RDD grow a lot, and maybe in an
> unbalanced way sometimes,
> adding to that partitioning by date (in daily ETLs for instance) would
> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
> supposed to spill to disk if the underlying RDD is too big to fit in memory?
>
> If I'm not using "partitionBy" with the writer (still exploding)
> everything works fine.
>
> This happens both in EMR and in local (mac) pyspark/spark shell (tried
> both in python and scala).
>
> Thanks!
>
>
>


Re: partitionBy causing OOM

2017-09-25 Thread 孫澤恩
Hi, Amit,

Maybe you can change this configuration spark.sql.shuffle.partitions.
The default is 200 change this property could change the task number when you 
are using DataFrame API.

> On 26 Sep 2017, at 1:25 AM, Amit Sela  wrote:
> 
> I'm trying to run a simple pyspark application that reads from file (json), 
> flattens it (explode) and writes back to file (json) partitioned by date 
> using DataFrameWriter.partitionBy(*cols).
> 
> I keep getting OOMEs like:
> java.lang.OutOfMemoryError: Java heap space
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
>   at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
>   at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
> ...
> 
> Explode could make the underlying RDD grow a lot, and maybe in an unbalanced 
> way sometimes,  
> adding to that partitioning by date (in daily ETLs for instance) would 
> probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark 
> supposed to spill to disk if the underlying RDD is too big to fit in memory?
> 
> If I'm not using "partitionBy" with the writer (still exploding) everything 
> works fine.
> 
> This happens both in EMR and in local (mac) pyspark/spark shell (tried both 
> in python and scala).
> 
> Thanks!



Unpersist all from memory in spark 2.2

2017-09-25 Thread Cesar
Is there a way to unpersist all data frames, data sets, and/or RDD in Spark
2.2 in a single call?


Thanks
-- 
Cesar Flores


Announcing Spark on Kubernetes release 0.4.0

2017-09-25 Thread Erik Erlandson
The Spark on Kubernetes development community is pleased to announce
release 0.4.0 of Apache Spark with native Kubernetes scheduler back-end!

The dev community is planning to use this release as the reference for
upstreaming native kubernetes capability over the Spark 2.3 release cycle.

This release includes a variety of bug fixes and code improvements, as well
as the following new features:

   - HDFS rack locality support
   - Mount small files using secrets, without running the resource staging
   server
   - Java options exposed to executor pods
   - User specified secrets injection for driver and executor pods
   - Unit testing for the Kubernetes scheduler backend
   - Standardized docker image build scripting
   - Reference YAML for RBAC configurations

The full release notes are available here:
https://github.com/apache-spark-on-k8s/spark/releases/tag/v2.2.0-kubernetes-0.4.0

Community resources for Spark on Kubernetes are available at:

   - Slack: https://kubernetes.slack.com
   - User Docs: https://apache-spark-on-k8s.github.io/userdocs/
   - GitHub: https://github.com/apache-spark-on-k8s/spark


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-25 Thread Gokula Krishnan D
Thanks for the reply. Forgot to mention that, our Batch ETL Jobs are in
Core-Spark.


On Sep 22, 2017, at 3:13 PM, Vadim Semenov 
wrote:

1. 40s is pretty negligible unless you run your job very frequently, there
can be many factors that influence that.

2. Try to compare the CPU time instead of the wall-clock time

3. Check the stages that got slower and compare the DAGs

4. Test with dynamic allocation disabled

On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D 
wrote:

> Hello All,
>
> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
> into Spark 2.1.0.
>
> With minor code changes (like configuration and Spark Session.sc) able to
> execute the existing JOB into Spark 2.1.0.
>
> But noticed that JOB completion timings are much better in Spark 1.6.0 but
> no in Spark 2.1.0.
>
> For the instance, JOB A completed in 50s in Spark 1.6.0.
>
> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>
> Is there any specific factor needs to be considered when switching to
> Spark 2.1.0 from Spark 1.6.0.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>


partitionBy causing OOM

2017-09-25 Thread Amit Sela
I'm trying to run a simple pyspark application that reads from file (json),
flattens it (explode) and writes back to file (json) partitioned by date
using DataFrameWriter.partitionBy(*cols).

I keep getting OOMEs like:
java.lang.OutOfMemoryError: Java heap space
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.(UnsafeSorterSpillWriter.java:46)
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
at
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:203)
...

Explode could make the underlying RDD grow a lot, and maybe in an
unbalanced way sometimes,
adding to that partitioning by date (in daily ETLs for instance) would
probably cause a data skew (right?), but why am I getting OOMs? Isn't Spark
supposed to spill to disk if the underlying RDD is too big to fit in memory?

If I'm not using "partitionBy" with the writer (still exploding) everything
works fine.

This happens both in EMR and in local (mac) pyspark/spark shell (tried both
in python and scala).

Thanks!


How to write dataframe to kafka topic in spark streaming application using pyspark?

2017-09-25 Thread umargeek
Can anyone provide me code snippet/ steps to write a data frame to Kafka
topic in a spark streaming application using pyspark with spark 2.1.1 and
Kafka 0.8 (Direct Stream Approach)?

Thanks,
Umar



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



hive2 query using SparkSQL seems wrong

2017-09-25 Thread Cinyoung Hur
Hi,
I'm using hive 2.3.0, spark 2.1.1, and zeppelin 0.7.2.

When I submit query in hive interpreter, it works fine.
I could see exactly same query in zeppelin notebook and hiveserver2 web UI.

However, when I submitted query using sparksql, query seemed wrong.
For example, every columns are with double quotes, like this.

SELECT
"component_2015.spec_id_sno","component_2015.jid","component_2015.fom_tp_cd","component_2015.dif",...
FROM component_2015


And the query just finished without any results.
Is this problem of Spark? or Hive?
Please help me.

Regards,
Cinyoung


Re: Offline environment

2017-09-25 Thread Georg Heiler
Just build a fat jar and do not apply --packages
serkan ta?  schrieb am Mo. 25. Sep. 2017 um 09:24:

> Hi,
>
> Everytime i submit spark job, checks the dependent jars from remote maven
> repo.
>
> Is it  possible to set spark first load the cached jars rather than
> looking for internet connection?
>


Offline environment

2017-09-25 Thread serkan ta?
Hi,

Everytime i submit spark job, checks the dependent jars from remote maven repo.

Is it  possible to set spark first load the cached jars rather than looking for 
internet connection?