Storage Partition Joins only works for buckets?

2023-11-08 Thread Arwin Tio
Hey team,

I was reading through the Storage Partition Join SPIP 
(https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE/edit#heading=h.82w8qxfl2uwl)
 but it seems like it only supports buckets, not partitions. Is that true? And 
if so does anybody have an intuition for why - is it simply a bad idea?

Thanks,

Arwin



Using Streaming Listener in a Structured Streaming job

2020-12-28 Thread Arwin Tio
In a Structured Streaming job, the listener that is supported is 
StreamingQueryListener.

spark.streams().addListener(
  new StreamingQueryListener() {
...
  }
);

However, there is no straightforward way to use StreamingListener.

I have done it like this:

StreamingContext streamingContext = new StreamingContext(spark.sparkContext(), 
new Duration(1000))

streamingContext.addStreamingListener(
  new StreamingListener() {
...
  }
)
However, this is not working for me.  Is there a way to use StreamingListener 
in a Structured Streaming query?

Thanks,

Arwin


Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Arwin Tio
Okay! I didn't realize you can pump those partition numbers up that high. 15000 
partitions still failed. I am trying 3 partitions now. There is still some 
disk spill but it is not that high.

Thanks,

Arwin


From: Chris Teoh 
Sent: January 17, 2020 7:32 PM
To: Arwin Tio 
Cc: user @spark 
Subject: Re: Spark Executor OOMs when writing Parquet

You also have disk spill which is a performance hit.

Try multiplying the number of partitions by about 20x - 40x and see if you can 
eliminate shuffle spill.

On Fri, 17 Jan 2020, 10:37 pm Arwin Tio, 
mailto:arwin@hotmail.com>> wrote:
Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was under 
the impression that memory spill is OK?

[cid:52075a7e-f05d-4d0d-a6e3-0ea4f7cf2c6c]
(If you're wondering, this is EMR).


From: Chris Teoh mailto:chris.t...@gmail.com>>
Sent: January 17, 2020 10:30 AM
To: Arwin Tio mailto:arwin@hotmail.com>>
Cc: user @spark mailto:user@spark.apache.org>>
Subject: Re: Spark Executor OOMs when writing Parquet

Sounds like you don't have enough partitions. Try and repartition to 14496 
partitions. Are your stages experiencing shuffle spill?

On Fri, 17 Jan 2020, 10:12 pm Arwin Tio, 
mailto:arwin@hotmail.com>> wrote:
Hello,

I have a fairly straightforward Spark job that converts CSV to Parquet:

```
Dataset df = spark.read(...)

df
  .repartition(5000)
  .write()
  .format("parquet")
  .parquet("s3://mypath/...);
```

For context, there are about 5 billion rows, each with 2000 columns. The entire 
dataset is about 1 TB (compressed).

The error looks like this:

```
  20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID 
24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13): 
org.apache.spark.SparkException: Task failed while writing rows.
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError
at sun.misc.Unsafe.allocateMemory(Native Method)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at 
org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
at 
org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
at 
org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
at 
org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
at 
org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
at 
org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
at 
org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
at 
org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNullForMissingFieldsAtCurrentLevel(MessageColumnIO.java:337)
at 
org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.endMessage(MessageColumnIO.java:299)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:424)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetWri

In Catalyst expressions, when is it appropriate to use codegen

2019-08-29 Thread Arwin Tio
Hi,

I am exploring the usage of Catalyst expression functions to avoid the 
performance issues associated with UDFs.

One thing that I noticed is that there is a trait called CodegenFallback and 
there are some Catalyst expressions in Spark that inherit from it [0].

My question is, is there a technical limitation for some Catalyst expressions, 
like datetimeExpressions, that make codegen unsuitable? How do you evaluate 
whether or not a Catalyst expression should use codegen?

Thanks,

Arwin

[0] 
https://github.com/apache/spark/blob/3a4afce96c6840431ed45280742f9e969be19639/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L95


Re: Creating custom Spark-Native catalyst/codegen functions

2019-08-22 Thread Arwin Tio
Hey,

It seems like the GeoSpark repo is not publicly accessible?

But from the filepath it seems like the Spark codebase itself was forked or 
modified.

The examples that I've seen seem to suggest that you need to register custom 
Spark-Native functions inside Spark's private namespace like you said 
(FunctionRegistry.scala I believe).

I was wondering if it was possible to add the more efficient Spark-Native 
functions in my user application without having to fork or modify Spark itself.

Thanks,

Arwin

From: Georg Heiler
Sent: Wednesday, August 21, 11:18 PM
Subject: Re: Creating custom Spark-Native catalyst/codegen functions
To: Arwin Tio
Cc: user@spark.apache.org


Look at 
https://github.com/DataSystemsLab/GeoSpark/tree/master/sql/src/main/scala/org/apache/spark/sql/geospark
 sql for an example.


Using custom function registration and functions residing inside sparks private 
namespace should work.

But I am not aware of a public user facing API.
Is there any I am missing?


Arwin Tio < arwin@hotmail.com<mailto:arwin@hotmail.com>> schrieb am Do. 
22. Aug. 2019 04:28:
Hi friends,

I am looking into converting some UDFs/UDAFs to Spark-Native functions to 
leverage Catalyst and codegen.

Looking through some examples (for example:  
https://github.com/apache/spark/pull/7214/files for Levenshtein) it seems like 
we need to add these functions to the Spark framework itself.

Is there a way to add custom Spark-Native functions in "userspace"?

Thank you!

Arwin




Creating custom Spark-Native catalyst/codegen functions

2019-08-21 Thread Arwin Tio
Hi friends,

I am looking into converting some UDFs/UDAFs to Spark-Native functions to 
leverage Catalyst and codegen.

Looking through some examples (for example: 
https://github.com/apache/spark/pull/7214/files for Levenshtein) it seems like 
we need to add these functions to the Spark framework itself.

Is there a way to add custom Spark-Native functions in "userspace"?

Thank you!

Arwin


Parquet 'bucketBy' creates a ton of files

2019-07-04 Thread Arwin Tio
I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors 
(not sure the terminology), so I end up with files that look like:

```
part-1-{UUID}_1.c000.snappy.parquet
part-1-{UUID}_2.c000.snappy.parquet
...
part-1-{UUID}_00500.c000.snappy.parquet

part-2-{UUID}_1.c000.snappy.parquet
part-2-{UUID}_2.c000.snappy.parquet
...
part-2-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_1.c000.snappy.parquet
part-00500-{UUID}_2.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=25 bucketed parquet files! It takes forever for the 
`FileOutputCommitter` to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a 
better way to deal with this problem? As of now it seems like I have to choose 
between lowering the parallelism of my cluster (reduce number of writers) or 
reducing the parallelism of my parquet files (reduce number of buckets), which 
will lower the parallelism of my downstream jobs.

Thanks