Re: Spark Executor OOMs when writing Parquet
Yes. Disk spill can be a huge performance hit, with smaller partitions you may avoid this and possibly complete your job faster. I hope you don't get OOM. On Sat, 18 Jan 2020 at 10:06, Arwin Tio wrote: > Okay! I didn't realize you can pump those partition numbers up that high. > 15000 partitions still failed. I am trying 3 partitions now. There is > still some disk spill but it is not that high. > > Thanks, > > Arwin > > -- > *From:* Chris Teoh > *Sent:* January 17, 2020 7:32 PM > *To:* Arwin Tio > *Cc:* user @spark > *Subject:* Re: Spark Executor OOMs when writing Parquet > > You also have disk spill which is a performance hit. > > Try multiplying the number of partitions by about 20x - 40x and see if you > can eliminate shuffle spill. > > On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, wrote: > > Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was > under the impression that memory spill is OK? > > > (If you're wondering, this is EMR). > > -- > *From:* Chris Teoh > *Sent:* January 17, 2020 10:30 AM > *To:* Arwin Tio > *Cc:* user @spark > *Subject:* Re: Spark Executor OOMs when writing Parquet > > Sounds like you don't have enough partitions. Try and repartition to 14496 > partitions. Are your stages experiencing shuffle spill? > > On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, wrote: > > Hello, > > I have a fairly straightforward Spark job that converts CSV to Parquet: > > ``` > Dataset df = spark.read(...) > > df > .repartition(5000) > .write() > .format("parquet") > .parquet("s3://mypath/...); > ``` > > For context, there are about 5 billion rows, each with 2000 columns. The > entire dataset is about 1 TB (compressed). > > The error looks like this: > > ``` > 20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID > 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): > org.apache.spark.SparkException: Task failed while writing rows. > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.OutOfMemoryError > at sun.misc.Unsafe.allocateMemory(Native Method) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at > org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) > at > org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) > at > org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) > at > org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) > at > org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) > at > org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) > at > org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) > at > org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) > at > org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) > at > org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337) > at > org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299) > at >
Re: Spark Executor OOMs when writing Parquet
Okay! I didn't realize you can pump those partition numbers up that high. 15000 partitions still failed. I am trying 3 partitions now. There is still some disk spill but it is not that high. Thanks, Arwin From: Chris Teoh Sent: January 17, 2020 7:32 PM To: Arwin Tio Cc: user @spark Subject: Re: Spark Executor OOMs when writing Parquet You also have disk spill which is a performance hit. Try multiplying the number of partitions by about 20x - 40x and see if you can eliminate shuffle spill. On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, mailto:arwin@hotmail.com>> wrote: Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under the impression that memory spill is OK? [cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c] (If you're wondering, this is EMR). From: Chris Teoh mailto:chris.t...@gmail.com>> Sent: January 17, 2020 10:30 AM To: Arwin Tio mailto:arwin@hotmail.com>> Cc: user @spark mailto:user@spark.apache.org>> Subject: Re: Spark Executor OOMs when writing Parquet Sounds like you don't have enough partitions. Try and repartition to 14496 partitions. Are your stages experiencing shuffle spill? On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, mailto:arwin@hotmail.com>> wrote: Hello, I have a fairly straightforward Spark job that converts CSV to Parquet: ``` Dataset df = spark.read(...) df .repartition(5000) .write() .format("parquet") .parquet("s3://mypath/...); ``` For context, there are about 5 billion rows, each with 2000 columns. The entire dataset is about 1 TB (compressed). The error looks like this: ``` 20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.OutOfMemoryError at sun.misc.Unsafe.allocateMemory(Native Method) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97) at org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227) at org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247) at org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405) at org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296) at org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95) at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147) at org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106) at org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337) at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424) at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113) at
Extract value from streaming Dataframe to a variable
I need to extract a value from a PySpark structured streaming Dataframe to a string variable to check something. I tried this code. agentName = kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0] This works on a non-streaming Dataframe only. In a streaming Dataframe, collect is not supported. Any workaround for this? Nick
Is there a way to get the final web URL from an active Spark context
Given a session/context, we can get the UI web URL like this: sparkSession.sparkContext.uiWebUrl This gives me something like http://node-name.cluster-name:4040. If opening this from outside the cluster (ex: my laptop), this redirects via HTTP 302 to something like http://node-name.cluster-name:8088/proxy/redirect/application_1579210019853_0023/. For discussion purposes, call the latter one the "final web URL". Critically, this final URL is active even after the application terminates. The original uiWebUrl (http://node-name.cluster-name:4040) is not available after the application terminates, so one has to have captured the redirect in time, if they want to provide a persistent link to that history server UI entry (ex: for debugging purposes). Is there a way, other than using some HTTP client, to detect what this final URL will be directly from the SparkContext? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
unsubscribe
Record count query parallel processing in databricks spark delta lake
Hi, I have a question on the design of monitoring pyspark script on the large number of source json data coming from more than 100 kafka topics. These multiple topics are store under separate bucket in aws s3.each of the kafka topics having more Terabytes of json data with respect to the partition year,month,day,hour data. each hour having lot of json files with .gz compression format. What is the best way to process more terabytes of data read from s3 under partition year,month,day,hour for all the topics source. we are using databricks delta lake in databricks platform.query is taking lot of time to get the count of records by year,month,date wise. what is the best approach to handle terabytes of data to get the record counts for all the days. please help me on the below problem: topics_list.csv -- I'm planning to put all the 150 topics in the csv file to read and process the data to get day record count. I have to iterate sequence one by one topics from csv file using for loop or other options,to pass the year,month,date arguments to get the record count for the particular day for all the topics. df =spark.read.json("s3a://kafka-bucket_name/topic_name/year/month/day/hour/") df.createOrReplaceTempView(topic1_source) spark.sql("select count(1) from topic1_source") Could you help me or give an good approach to parallely run the query for all the topics to get the record day count for all the 150 topics effectively using apache spark delta lake in databricks. thanks -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
unsubscribe
Re: Cannot read case-sensitive Glue table backed by Parquet
Sorry, but my original solution is incorrect 1. Glue Crawlers are not supposed to set the spark.sql.sources.schema.* properties, but Spark SQL should. The default in Spark 2.4 for spark.sql.hive.caseSensitiveInferenceMode is INFER_AND_SAVE which means that Spark infers the schema from the underlying files and alters the tables to add the spark.sql.sources.schema.* properties to SERDEPROPERTIES. In our case, Spark failed to do so, because of a I"llegalArgumentException: Can not create a Path from an empty string" exception which is caused because the Hive database class instance has an empty locationUri property string. This is caused because the Glue database does not have a Location property enter image description here. After the schema is saved, Spark reads it from the table. 2. There could be a way around this, by setting INFER_ONLY, which should only infer the schema from the files and not attempt to alter the table SERDEPROPERTIES. However, this doesn't work because of a Spark bug, where the inferred schema is then lowercased [1]. [1] https://github.com/apache/spark/blob/c1b6fe479649c482947dfce6b6db67b159bd78a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L284 -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Cannot read case-sensitive Glue table backed by Parquet
This bug happens because the Glue table's SERDEPROPERTIES is missing two important properties: spark.sql.sources.schema.numParts spark.sql.sources.schema.part.0 To solve the problem, I had to add those two properties via the Glue console (couldn't do it with ALTER TABLE …) I guess this is a bug with Glue crawlers, which do not set these properties when creating the table. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org