RE: Spark SQL driver memory keeps rising

2016-06-16 Thread Mohammed Guller
I haven’t read the code yet, but when you invoke spark-submit, where are you 
specifying --master yarn --deploy-mode client? Is it in the default config file 
and are you sure that spark-submit is reading the right file?

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Khaled Hammouda [mailto:khaled.hammo...@kik.com]
Sent: Thursday, June 16, 2016 11:45 AM
To: Mohammed Guller
Cc: user
Subject: Re: Spark SQL driver memory keeps rising

I'm using pyspark and running in YARN client mode. I managed to anonymize the 
code a bit and pasted it below.

You'll notice that I don't collect any output in the driver, instead the data 
is written to parquet directly. Also notice that I increased 
spark.driver.maxResultSize to 10g because the job was failing with the error 
"serialized results of x tasks () is bigger than spark.driver.maxResultSize 
(xxx)", which means the tasks were sending something to the driver, and that's 
probably what's causing the driver memory usage to keep rising. This happens at 
stages that read/write shuffled data (as opposed to input stages).

I also noticed this message appear many times in the output "INFO: 
MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 
4 to ip-xx-xx-xx-xx.ec2.internal:49490". I'm not sure if it's relevant.

Cluster setup:
- AWS EMR 4.7.1 using Spark 1.6.1
- 200 nodes (r3.2xlarge, 61 GB memory), and the master node is r3.4xlarge (122 
GB memory)
- 1.4 TB json logs split across ~70k files in S3


# spark-submit --driver-memory 100g --executor-memory 44g --executor-cores 8 
--conf spark.yarn.executor.memoryOverhead=9000 --num-executors 199 
log_cleansing.py

conf = SparkConf()
conf.setAppName('log_cleansing') \
.set("spark.driver.maxResultSize", "10g") \
.set("spark.hadoop.spark.sql.parquet.output.committer.class", 
"org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter")
 \
.set("spark.hadoop.mapred.output.committer.class", 
"org.apache.hadoop.mapred.DirectFileOutputCommitter") \
.set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.shuffle.partitions", "3600")

# raw data is stored in s3 and is split across 10s of thousands of files
# (each file is about 20MB, total data set size is around 1.4 TB)
# so we coalesce to a manageble number of partitions and store it in hdfs
text = sc.textFile("s3://json_logs/today/*/*").coalesce(400)
text.saveAsTextFile("hdfs:///json_logs/today")

# now read the json data set from hdfs
json_logs = sqlContext.read.json("hdfs:///json_logs/today")
sqlContext.registerDataFrameAsTable(json_logs, "json_logs")

# this cleans the date and only pulls in events within two days of their 
ingestion ts, elimiated pure duplicates
clean_date_and_gap = sqlContext.sql(
 '''SELECT DISTINCT t.*
FROM json_logs t
WHERE datediff(to_utc_timestamp(`ingestionTime`, 'UTC'), 
to_utc_timestamp(`timestamp`, 'UTC')) < 2
  AND to_utc_timestamp(`timestamp`, 'UTC') IS NOT NULL''')
sqlContext.registerDataFrameAsTable(clean_date_and_gap, "clean_date_and_gap")
clean_date_and_gap.cache()

# extract unique event instanceIds and their min ts
distinct_intanceIds = sqlContext.sql(
 '''SELECT
  instanceId,
  min(to_utc_timestamp(`ingestionTime`, 'UTC')) min_ingestion_ts
FROM clean_date_and_gap
GROUP BY instanceId''')
sqlContext.registerDataFrameAsTable(distinct_intanceIds, "distinct_intanceIds")

# create table with only the min ingestion ts records
deduped_day = sqlContext.sql(
 '''SELECT t.*
FROM clean_date_and_gap t
JOIN distinct_intanceIds d
  ON (d.instanceId = t.instanceId AND
  to_utc_timestamp(`ingestionTime`, 'UTC') = d.min_ingestion_ts)''')

# drop weird columns
deduped_day = deduped_day.drop('weird column name1') \
 .drop('weird column name2') \
 .drop('weird column name3')

# standardize all column names so that Parquet is happy
# rename() adds clean alias names to hide problematic column names
oldCols = deduped_day.schema.names
deduped_day_1 = deduped_day.select(*(rename(oldCols)))

sqlContext.registerDataFrameAsTable(deduped_day_1, 'deduped_day_1')
deduped_day_1.cache()

clean_date_and_gap.unpersist()

# load prior days isntanceids for deduping purpose
last1_instanceIds = 
sqlContext.read.parquet("s3://unique_instanceids/today_minus_1day/*")
last2_instanceIds = 
sqlContext.read.parquet("s3://unique_instanceids/today_minus_2day/*")
prior_instanceIds = 
last1_instanceIds.unionAll(last2_instanceIds).distinct().cache()
sqlContext.registerDataFrameAsTable(prior_instanceIds, 'prior_instanceIds')

# determine

Re: Spark SQL driver memory keeps rising

2016-06-16 Thread Khaled Hammouda
o.Instanceid)
WHERE o.seen_before = 0''')

final_cleaned_data = final_cleaned_data.coalesce(200)
final_cleaned_data.write.parquet("s3://cleaned_data/today.parquet")


On Wed, Jun 15, 2016 at 10:23 PM, Mohammed Guller <moham...@glassbeam.com>
wrote:

> It would be hard to guess what could be going on without looking at the
> code. It looks like the driver program goes into a long stop-the-world GC
> pause. This should not happen on the machine running the driver program if
> all that you are doing is reading data from HDFS, perform a bunch of
> transformations and write result back into HDFS.
>
>
>
> Perhaps, the program is not actually using Spark in cluster mode, but
> running Spark in local mode?
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Khaled Hammouda [mailto:khaled.hammo...@kik.com]
> *Sent:* Tuesday, June 14, 2016 10:23 PM
> *To:* user
> *Subject:* Spark SQL driver memory keeps rising
>
>
>
> I'm having trouble with a Spark SQL job in which I run a series of SQL
> transformations on data loaded from HDFS.
>
>
>
> The first two stages load data from hdfs input without issues, but later
> stages that require shuffles cause the driver memory to keep rising until
> it is exhausted, and then the driver stalls, the spark UI stops responding,
> and the I can't even kill the driver with ^C, I have to forcibly kill the
> process.
>
>
>
> I think I'm allocating enough memory to the driver: driver memory is 44
> GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory
> usage, the driver memory before the shuffle starts is at about 2.4 GB
> (virtual mem size for the driver process is about 50 GB), and then once the
> stages that require shuffle start I can see the driver memory rising fast
> to about 47 GB, then everything stops responding.
>
>
>
> I'm not invoking any output operation that collects data at the driver. I
> just call .cache() on a couple of dataframes since they get used more than
> once in the SQL transformations, but those should be cached on the workers.
> Then I write the final result to a parquet file, but the job doesn't get to
> this final stage.
>
>
>
> What could possibly be causing the driver memory to rise that fast when no
> data is being collected at the driver?
>
>
>
> Thanks,
>
> Khaled
>


Re: Spark SQL driver memory keeps rising

2016-06-15 Thread Mich Talebzadeh
you will need to be more specific about how you are using these parameters.

have you looked at spark WEB GUI (default port 4040) to see the jobs and
stages. the amount of shuffle will also be given.

also it helps if you do jps on OS and send the output of ps aux|grep ,PID> as
well.

What sort of resource manager are you using? Have you looked at yarn logs?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 16 June 2016 at 03:23, Mohammed Guller <moham...@glassbeam.com> wrote:

> It would be hard to guess what could be going on without looking at the
> code. It looks like the driver program goes into a long stop-the-world GC
> pause. This should not happen on the machine running the driver program if
> all that you are doing is reading data from HDFS, perform a bunch of
> transformations and write result back into HDFS.
>
>
>
> Perhaps, the program is not actually using Spark in cluster mode, but
> running Spark in local mode?
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>
>
>
>
> *From:* Khaled Hammouda [mailto:khaled.hammo...@kik.com]
> *Sent:* Tuesday, June 14, 2016 10:23 PM
> *To:* user
> *Subject:* Spark SQL driver memory keeps rising
>
>
>
> I'm having trouble with a Spark SQL job in which I run a series of SQL
> transformations on data loaded from HDFS.
>
>
>
> The first two stages load data from hdfs input without issues, but later
> stages that require shuffles cause the driver memory to keep rising until
> it is exhausted, and then the driver stalls, the spark UI stops responding,
> and the I can't even kill the driver with ^C, I have to forcibly kill the
> process.
>
>
>
> I think I'm allocating enough memory to the driver: driver memory is 44
> GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory
> usage, the driver memory before the shuffle starts is at about 2.4 GB
> (virtual mem size for the driver process is about 50 GB), and then once the
> stages that require shuffle start I can see the driver memory rising fast
> to about 47 GB, then everything stops responding.
>
>
>
> I'm not invoking any output operation that collects data at the driver. I
> just call .cache() on a couple of dataframes since they get used more than
> once in the SQL transformations, but those should be cached on the workers.
> Then I write the final result to a parquet file, but the job doesn't get to
> this final stage.
>
>
>
> What could possibly be causing the driver memory to rise that fast when no
> data is being collected at the driver?
>
>
>
> Thanks,
>
> Khaled
>


RE: Spark SQL driver memory keeps rising

2016-06-15 Thread Mohammed Guller
It would be hard to guess what could be going on without looking at the code. 
It looks like the driver program goes into a long stop-the-world GC pause. This 
should not happen on the machine running the driver program if all that you are 
doing is reading data from HDFS, perform a bunch of transformations and write 
result back into HDFS.

Perhaps, the program is not actually using Spark in cluster mode, but running 
Spark in local mode?

Mohammed
Author: Big Data Analytics with 
Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/>

From: Khaled Hammouda [mailto:khaled.hammo...@kik.com]
Sent: Tuesday, June 14, 2016 10:23 PM
To: user
Subject: Spark SQL driver memory keeps rising

I'm having trouble with a Spark SQL job in which I run a series of SQL 
transformations on data loaded from HDFS.

The first two stages load data from hdfs input without issues, but later stages 
that require shuffles cause the driver memory to keep rising until it is 
exhausted, and then the driver stalls, the spark UI stops responding, and the I 
can't even kill the driver with ^C, I have to forcibly kill the process.

I think I'm allocating enough memory to the driver: driver memory is 44 GB, and 
spark.driver.memoryOverhead is 4.5 GB. When I look at the memory usage, the 
driver memory before the shuffle starts is at about 2.4 GB (virtual mem size 
for the driver process is about 50 GB), and then once the stages that require 
shuffle start I can see the driver memory rising fast to about 47 GB, then 
everything stops responding.

I'm not invoking any output operation that collects data at the driver. I just 
call .cache() on a couple of dataframes since they get used more than once in 
the SQL transformations, but those should be cached on the workers. Then I 
write the final result to a parquet file, but the job doesn't get to this final 
stage.

What could possibly be causing the driver memory to rise that fast when no data 
is being collected at the driver?

Thanks,
Khaled


Spark SQL driver memory keeps rising

2016-06-14 Thread Khaled Hammouda
I'm having trouble with a Spark SQL job in which I run a series of SQL
transformations on data loaded from HDFS.

The first two stages load data from hdfs input without issues, but later
stages that require shuffles cause the driver memory to keep rising until
it is exhausted, and then the driver stalls, the spark UI stops responding,
and the I can't even kill the driver with ^C, I have to forcibly kill the
process.

I think I'm allocating enough memory to the driver: driver memory is 44 GB,
and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory usage,
the driver memory before the shuffle starts is at about 2.4 GB (virtual mem
size for the driver process is about 50 GB), and then once the stages that
require shuffle start I can see the driver memory rising fast to about 47
GB, then everything stops responding.

I'm not invoking any output operation that collects data at the driver. I
just call .cache() on a couple of dataframes since they get used more than
once in the SQL transformations, but those should be cached on the workers.
Then I write the final result to a parquet file, but the job doesn't get to
this final stage.

What could possibly be causing the driver memory to rise that fast when no
data is being collected at the driver?

Thanks,
Khaled


SQL Driver

2016-04-19 Thread AlexModestov
Hello all,
I use a string when I'm launching the Sparkling-Water:
"--conf
spark.driver.extraClassPath='/SQLDrivers/sqljdbc_4.2/enu/sqljdbc41.jar"
and I get the error:
"
---
TypeError Traceback (most recent call last)
 in ()
  1 from pysparkling import *
> 2 hc = H2OContext(sc).start()

/tmp/modestov/spark/work/spark-5695a33c-905d-4af5-a719-88b7be0e0c45/userFiles-77e075c2-41cc-44d6-96fb-a2668b112133/pySparkling-1.6.1-py2.7.egg/pysparkling/context.py
in __init__(self, sparkContext)
 70 def __init__(self, sparkContext):
 71 try:
---> 72 self._do_init(sparkContext)
 73 # Hack H2OFrame from h2o package
 74 _monkey_patch_H2OFrame(self)

/tmp/modestov/spark/work/spark-5695a33c-905d-4af5-a719-88b7be0e0c45/userFiles-77e075c2-41cc-44d6-96fb-a2668b112133/pySparkling-1.6.1-py2.7.egg/pysparkling/context.py
in _do_init(self, sparkContext)
 94 gw = self._gw
 95 
---> 96 self._jhc =
jvm.org.apache.spark.h2o.H2OContext.getOrCreate(sc._jsc)
 97 self._client_ip = None
 98 self._client_port = None

TypeError: 'JavaPackage' object is not callable"
What does it mean?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Driver-tp26800.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org