Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Chris Teoh
Yes. Disk spill can be a huge performance hit, with smaller partitions you
may avoid this and possibly complete your job faster. I hope you don't get
OOM.

On Sat, 18 Jan 2020 at 10:06, Arwin Tio  wrote:

> Okay! I didn't realize you can pump those partition numbers up that high.
> 15000 partitions still failed. I am trying 3 partitions now. There is
> still some disk spill but it is not that high.
>
> Thanks,
>
> Arwin
>
> --
> *From:* Chris Teoh 
> *Sent:* January 17, 2020 7:32 PM
> *To:* Arwin Tio 
> *Cc:* user @spark 
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> You also have disk spill which is a performance hit.
>
> Try multiplying the number of partitions by about 20x - 40x and see if you
> can eliminate shuffle spill.
>
> On Fri, 17 Jan 2020, 10:37 pm Arwin Tio,  wrote:
>
> Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was
> under the impression that memory spill is OK?
>
>
> (If you're wondering, this is EMR).
>
> --
> *From:* Chris Teoh 
> *Sent:* January 17, 2020 10:30 AM
> *To:* Arwin Tio 
> *Cc:* user @spark 
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Sounds like you don't have enough partitions. Try and repartition to 14496
> partitions. Are your stages experiencing shuffle spill?
>
> On Fri, 17 Jan 2020, 10:12 pm Arwin Tio,  wrote:
>
> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
> at sun.misc.Unsafe.allocateMemory(Native Method)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
> at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
> at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
> at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
> at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
> at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
> at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
> at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
> at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
> at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
> at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
> at
> 

Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Arwin Tio
Okay! I didn't realize you can pump those partition numbers up that high. 15000 
partitions still failed. I am trying 3 partitions now. There is still some 
disk spill but it is not that high.

Thanks,

Arwin


From: Chris Teoh 
Sent: January 17, 2020 7:32 PM
To: Arwin Tio 
Cc: user @spark 
Subject: Re: Spark Executor OOMs when writing Parquet

You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can 
eliminate shuffle spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, 
mailto:arwin@hotmail.com>> wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under 
the impression that memory spill is OK?

[cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c]
(If you're wondering, this is EMR).


From: Chris Teoh mailto:chris.t...@gmail.com>>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio mailto:arwin@hotmail.com>>
Cc: user @spark mailto:user@spark.apache.org>>
Subject: Re: Spark Executor OOMs when writing Parquet

Sounds like you don't have enough partitions. Try and repartition to 14496 
partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, 
mailto:arwin@hotmail.com>> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire 
dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 
24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): 
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
at sun.misc.Unsafe.allocateMemory(Native Method)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
at 
org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
at 
org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:113)
at 

Extract value from streaming Dataframe to a variable

2020-01-17 Thread Nick Dawes
I need to extract a value from a PySpark structured streaming Dataframe to
a string variable to check something.

I tried this code.

agentName =
kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe,
collect is not supported.

Any workaround for this?

Nick


Is there a way to get the final web URL from an active Spark context

2020-01-17 Thread Jeff Evans
Given a session/context, we can get the UI web URL like this:

sparkSession.sparkContext.uiWebUrl

This gives me something like http://node-name.cluster-name:4040.  If
opening this from outside the cluster (ex: my laptop), this redirects
via HTTP 302 to something like
http://node-name.cluster-name:8088/proxy/redirect/application_1579210019853_0023/.
For discussion purposes, call the latter one the "final web URL".
Critically, this final URL is active even after the application
terminates.  The original uiWebUrl
(http://node-name.cluster-name:4040) is not available after the
application terminates, so one has to have captured the redirect in
time, if they want to provide a persistent link to that history server
UI entry (ex: for debugging purposes).

Is there a way, other than using some HTTP client, to detect what this
final URL will be directly from the SparkContext?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2020-01-17 Thread Bruno S. de Barros


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2020-01-17 Thread Christian Acuña



Record count query parallel processing in databricks spark delta lake

2020-01-17 Thread anbutech
Hi,

I have a question on the design of monitoring pyspark script on the large
number of source json data coming from more than 100 kafka topics.
These multiple topics are store under separate bucket in aws s3.each of the
kafka topics having more Terabytes of json data with respect to the
partition year,month,day,hour data.
each hour having lot of json files with .gz compression format.

What is the best way to process more terabytes of data read from s3 under
partition year,month,day,hour for all the topics source.

we are using databricks delta lake in databricks platform.query is taking
lot of time to get the count of records by year,month,date wise.

what is the best approach to handle terabytes of data to get the record
counts for all the days.

please help me on the below problem:

topics_list.csv
--
I'm planning to put all the 150 topics in the csv file to read and process
the data to get day record count.

I have to iterate sequence one by one topics from csv file using for loop or
other options,to pass the year,month,date arguments 
to get the record count for the particular day for all the topics.

df
=spark.read.json("s3a://kafka-bucket_name/topic_name/year/month/day/hour/")

df.createOrReplaceTempView(topic1_source)

spark.sql("select count(1) from topic1_source")

Could you help me or give an good  approach to parallely run the query for
all the topics to get the record day count for all the 150 topics
effectively using apache spark delta lake in databricks.

thanks











--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2020-01-17 Thread Sethupathi T



Re: Cannot read case-sensitive Glue table backed by Parquet

2020-01-17 Thread oripwk
Sorry, but my original solution is incorrect

1. Glue Crawlers are not supposed to set the spark.sql.sources.schema.*
properties, but Spark SQL should. The default in Spark 2.4 for
spark.sql.hive.caseSensitiveInferenceMode is INFER_AND_SAVE which means that
Spark infers the schema from the underlying files and alters the tables to
add the spark.sql.sources.schema.* properties to SERDEPROPERTIES. In our
case, Spark failed to do so, because of a I"llegalArgumentException: Can not
create a Path from an empty string" exception which is caused because the
Hive database class instance has an empty locationUri property string. This
is caused because the Glue database does not have a Location property enter
image description here. After the schema is saved, Spark reads it from the
table.
2. There could be a way around this, by setting INFER_ONLY, which should
only infer the schema from the files and not attempt to alter the table
SERDEPROPERTIES. However, this doesn't work because of a Spark bug, where
the inferred schema is then lowercased [1].

[1]
https://github.com/apache/spark/blob/c1b6fe479649c482947dfce6b6db67b159bd78a3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L284




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Cannot read case-sensitive Glue table backed by Parquet

2020-01-17 Thread oripwk



This bug happens because the Glue table's SERDEPROPERTIES is missing two
important properties:

spark.sql.sources.schema.numParts
spark.sql.sources.schema.part.0

To solve the problem, I had to add those two properties via the Glue console
(couldn't do it with ALTER TABLE …)

I guess this is a bug with Glue crawlers, which do not set these properties
when creating the table.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2020-01-17 Thread vijay krishna



unsubscribe

2020-01-17 Thread Pingxiao Ye