RE: Spark SQL driver memory keeps rising
I haven’t read the code yet, but when you invoke spark-submit, where are you specifying --master yarn --deploy-mode client? Is it in the default config file and are you sure that spark-submit is reading the right file? Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Khaled Hammouda [mailto:khaled.hammo...@kik.com] Sent: Thursday, June 16, 2016 11:45 AM To: Mohammed Guller Cc: user Subject: Re: Spark SQL driver memory keeps rising I'm using pyspark and running in YARN client mode. I managed to anonymize the code a bit and pasted it below. You'll notice that I don't collect any output in the driver, instead the data is written to parquet directly. Also notice that I increased spark.driver.maxResultSize to 10g because the job was failing with the error "serialized results of x tasks () is bigger than spark.driver.maxResultSize (xxx)", which means the tasks were sending something to the driver, and that's probably what's causing the driver memory usage to keep rising. This happens at stages that read/write shuffled data (as opposed to input stages). I also noticed this message appear many times in the output "INFO: MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 4 to ip-xx-xx-xx-xx.ec2.internal:49490". I'm not sure if it's relevant. Cluster setup: - AWS EMR 4.7.1 using Spark 1.6.1 - 200 nodes (r3.2xlarge, 61 GB memory), and the master node is r3.4xlarge (122 GB memory) - 1.4 TB json logs split across ~70k files in S3 # spark-submit --driver-memory 100g --executor-memory 44g --executor-cores 8 --conf spark.yarn.executor.memoryOverhead=9000 --num-executors 199 log_cleansing.py conf = SparkConf() conf.setAppName('log_cleansing') \ .set("spark.driver.maxResultSize", "10g") \ .set("spark.hadoop.spark.sql.parquet.output.committer.class", "org.apache.spark.sql.execution.datasources.parquet.DirectParquetOutputCommitter") \ .set("spark.hadoop.mapred.output.committer.class", "org.apache.hadoop.mapred.DirectFileOutputCommitter") \ .set("spark.hadoop.mapreduce.use.directfileoutputcommitter", "true") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) sqlContext.setConf("spark.sql.shuffle.partitions", "3600") # raw data is stored in s3 and is split across 10s of thousands of files # (each file is about 20MB, total data set size is around 1.4 TB) # so we coalesce to a manageble number of partitions and store it in hdfs text = sc.textFile("s3://json_logs/today/*/*").coalesce(400) text.saveAsTextFile("hdfs:///json_logs/today") # now read the json data set from hdfs json_logs = sqlContext.read.json("hdfs:///json_logs/today") sqlContext.registerDataFrameAsTable(json_logs, "json_logs") # this cleans the date and only pulls in events within two days of their ingestion ts, elimiated pure duplicates clean_date_and_gap = sqlContext.sql( '''SELECT DISTINCT t.* FROM json_logs t WHERE datediff(to_utc_timestamp(`ingestionTime`, 'UTC'), to_utc_timestamp(`timestamp`, 'UTC')) < 2 AND to_utc_timestamp(`timestamp`, 'UTC') IS NOT NULL''') sqlContext.registerDataFrameAsTable(clean_date_and_gap, "clean_date_and_gap") clean_date_and_gap.cache() # extract unique event instanceIds and their min ts distinct_intanceIds = sqlContext.sql( '''SELECT instanceId, min(to_utc_timestamp(`ingestionTime`, 'UTC')) min_ingestion_ts FROM clean_date_and_gap GROUP BY instanceId''') sqlContext.registerDataFrameAsTable(distinct_intanceIds, "distinct_intanceIds") # create table with only the min ingestion ts records deduped_day = sqlContext.sql( '''SELECT t.* FROM clean_date_and_gap t JOIN distinct_intanceIds d ON (d.instanceId = t.instanceId AND to_utc_timestamp(`ingestionTime`, 'UTC') = d.min_ingestion_ts)''') # drop weird columns deduped_day = deduped_day.drop('weird column name1') \ .drop('weird column name2') \ .drop('weird column name3') # standardize all column names so that Parquet is happy # rename() adds clean alias names to hide problematic column names oldCols = deduped_day.schema.names deduped_day_1 = deduped_day.select(*(rename(oldCols))) sqlContext.registerDataFrameAsTable(deduped_day_1, 'deduped_day_1') deduped_day_1.cache() clean_date_and_gap.unpersist() # load prior days isntanceids for deduping purpose last1_instanceIds = sqlContext.read.parquet("s3://unique_instanceids/today_minus_1day/*") last2_instanceIds = sqlContext.read.parquet("s3://unique_instanceids/today_minus_2day/*") prior_instanceIds = last1_instanceIds.unionAll(last2_instanceIds).distinct().cache() sqlContext.registerDataFrameAsTable(prior_instanceIds, 'prior_instanceIds') # determine
Re: Spark SQL driver memory keeps rising
o.Instanceid) WHERE o.seen_before = 0''') final_cleaned_data = final_cleaned_data.coalesce(200) final_cleaned_data.write.parquet("s3://cleaned_data/today.parquet") On Wed, Jun 15, 2016 at 10:23 PM, Mohammed Guller <moham...@glassbeam.com> wrote: > It would be hard to guess what could be going on without looking at the > code. It looks like the driver program goes into a long stop-the-world GC > pause. This should not happen on the machine running the driver program if > all that you are doing is reading data from HDFS, perform a bunch of > transformations and write result back into HDFS. > > > > Perhaps, the program is not actually using Spark in cluster mode, but > running Spark in local mode? > > > > Mohammed > > Author: Big Data Analytics with Spark > <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Khaled Hammouda [mailto:khaled.hammo...@kik.com] > *Sent:* Tuesday, June 14, 2016 10:23 PM > *To:* user > *Subject:* Spark SQL driver memory keeps rising > > > > I'm having trouble with a Spark SQL job in which I run a series of SQL > transformations on data loaded from HDFS. > > > > The first two stages load data from hdfs input without issues, but later > stages that require shuffles cause the driver memory to keep rising until > it is exhausted, and then the driver stalls, the spark UI stops responding, > and the I can't even kill the driver with ^C, I have to forcibly kill the > process. > > > > I think I'm allocating enough memory to the driver: driver memory is 44 > GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory > usage, the driver memory before the shuffle starts is at about 2.4 GB > (virtual mem size for the driver process is about 50 GB), and then once the > stages that require shuffle start I can see the driver memory rising fast > to about 47 GB, then everything stops responding. > > > > I'm not invoking any output operation that collects data at the driver. I > just call .cache() on a couple of dataframes since they get used more than > once in the SQL transformations, but those should be cached on the workers. > Then I write the final result to a parquet file, but the job doesn't get to > this final stage. > > > > What could possibly be causing the driver memory to rise that fast when no > data is being collected at the driver? > > > > Thanks, > > Khaled >
Re: Spark SQL driver memory keeps rising
you will need to be more specific about how you are using these parameters. have you looked at spark WEB GUI (default port 4040) to see the jobs and stages. the amount of shuffle will also be given. also it helps if you do jps on OS and send the output of ps aux|grep ,PID> as well. What sort of resource manager are you using? Have you looked at yarn logs? HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com On 16 June 2016 at 03:23, Mohammed Guller <moham...@glassbeam.com> wrote: > It would be hard to guess what could be going on without looking at the > code. It looks like the driver program goes into a long stop-the-world GC > pause. This should not happen on the machine running the driver program if > all that you are doing is reading data from HDFS, perform a bunch of > transformations and write result back into HDFS. > > > > Perhaps, the program is not actually using Spark in cluster mode, but > running Spark in local mode? > > > > Mohammed > > Author: Big Data Analytics with Spark > <http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Khaled Hammouda [mailto:khaled.hammo...@kik.com] > *Sent:* Tuesday, June 14, 2016 10:23 PM > *To:* user > *Subject:* Spark SQL driver memory keeps rising > > > > I'm having trouble with a Spark SQL job in which I run a series of SQL > transformations on data loaded from HDFS. > > > > The first two stages load data from hdfs input without issues, but later > stages that require shuffles cause the driver memory to keep rising until > it is exhausted, and then the driver stalls, the spark UI stops responding, > and the I can't even kill the driver with ^C, I have to forcibly kill the > process. > > > > I think I'm allocating enough memory to the driver: driver memory is 44 > GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory > usage, the driver memory before the shuffle starts is at about 2.4 GB > (virtual mem size for the driver process is about 50 GB), and then once the > stages that require shuffle start I can see the driver memory rising fast > to about 47 GB, then everything stops responding. > > > > I'm not invoking any output operation that collects data at the driver. I > just call .cache() on a couple of dataframes since they get used more than > once in the SQL transformations, but those should be cached on the workers. > Then I write the final result to a parquet file, but the job doesn't get to > this final stage. > > > > What could possibly be causing the driver memory to rise that fast when no > data is being collected at the driver? > > > > Thanks, > > Khaled >
RE: Spark SQL driver memory keeps rising
It would be hard to guess what could be going on without looking at the code. It looks like the driver program goes into a long stop-the-world GC pause. This should not happen on the machine running the driver program if all that you are doing is reading data from HDFS, perform a bunch of transformations and write result back into HDFS. Perhaps, the program is not actually using Spark in cluster mode, but running Spark in local mode? Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> From: Khaled Hammouda [mailto:khaled.hammo...@kik.com] Sent: Tuesday, June 14, 2016 10:23 PM To: user Subject: Spark SQL driver memory keeps rising I'm having trouble with a Spark SQL job in which I run a series of SQL transformations on data loaded from HDFS. The first two stages load data from hdfs input without issues, but later stages that require shuffles cause the driver memory to keep rising until it is exhausted, and then the driver stalls, the spark UI stops responding, and the I can't even kill the driver with ^C, I have to forcibly kill the process. I think I'm allocating enough memory to the driver: driver memory is 44 GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory usage, the driver memory before the shuffle starts is at about 2.4 GB (virtual mem size for the driver process is about 50 GB), and then once the stages that require shuffle start I can see the driver memory rising fast to about 47 GB, then everything stops responding. I'm not invoking any output operation that collects data at the driver. I just call .cache() on a couple of dataframes since they get used more than once in the SQL transformations, but those should be cached on the workers. Then I write the final result to a parquet file, but the job doesn't get to this final stage. What could possibly be causing the driver memory to rise that fast when no data is being collected at the driver? Thanks, Khaled
Spark SQL driver memory keeps rising
I'm having trouble with a Spark SQL job in which I run a series of SQL transformations on data loaded from HDFS. The first two stages load data from hdfs input without issues, but later stages that require shuffles cause the driver memory to keep rising until it is exhausted, and then the driver stalls, the spark UI stops responding, and the I can't even kill the driver with ^C, I have to forcibly kill the process. I think I'm allocating enough memory to the driver: driver memory is 44 GB, and spark.driver.memoryOverhead is 4.5 GB. When I look at the memory usage, the driver memory before the shuffle starts is at about 2.4 GB (virtual mem size for the driver process is about 50 GB), and then once the stages that require shuffle start I can see the driver memory rising fast to about 47 GB, then everything stops responding. I'm not invoking any output operation that collects data at the driver. I just call .cache() on a couple of dataframes since they get used more than once in the SQL transformations, but those should be cached on the workers. Then I write the final result to a parquet file, but the job doesn't get to this final stage. What could possibly be causing the driver memory to rise that fast when no data is being collected at the driver? Thanks, Khaled
SQL Driver
Hello all, I use a string when I'm launching the Sparkling-Water: "--conf spark.driver.extraClassPath='/SQLDrivers/sqljdbc_4.2/enu/sqljdbc41.jar" and I get the error: " --- TypeError Traceback (most recent call last) in () 1 from pysparkling import * > 2 hc = H2OContext(sc).start() /tmp/modestov/spark/work/spark-5695a33c-905d-4af5-a719-88b7be0e0c45/userFiles-77e075c2-41cc-44d6-96fb-a2668b112133/pySparkling-1.6.1-py2.7.egg/pysparkling/context.py in __init__(self, sparkContext) 70 def __init__(self, sparkContext): 71 try: ---> 72 self._do_init(sparkContext) 73 # Hack H2OFrame from h2o package 74 _monkey_patch_H2OFrame(self) /tmp/modestov/spark/work/spark-5695a33c-905d-4af5-a719-88b7be0e0c45/userFiles-77e075c2-41cc-44d6-96fb-a2668b112133/pySparkling-1.6.1-py2.7.egg/pysparkling/context.py in _do_init(self, sparkContext) 94 gw = self._gw 95 ---> 96 self._jhc = jvm.org.apache.spark.h2o.H2OContext.getOrCreate(sc._jsc) 97 self._client_ip = None 98 self._client_port = None TypeError: 'JavaPackage' object is not callable" What does it mean? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Driver-tp26800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org