[
https://issues.apache.org/jira/browse/SPARK-35848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17384527#comment-17384527
]
Sai Polisetty commented on SPARK-35848:
---
Thanks for taking a look at it, Sean. I am using [Azure
Standard_E8s_v3|https://docs.microsoft.com/en-us/azure/virtual-machines/ev3-esv3-series#esv3-series]
host that has 64GB memory with 8cores. While the cluster can handle
serialization of data beyond 2GB in size, the error in this particular case is
coming due to the hardcoded usage of JavaSerializer for zeroValue in
treeAggregate which has a 2GB limit. I believe this is done
[here|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L215],
but not sure if I am pointing my finger at the right place.
> Spark Bloom Filter, others using treeAggregate can throw OutOfMemoryError
> -
>
> Key: SPARK-35848
> URL: https://issues.apache.org/jira/browse/SPARK-35848
> Project: Spark
> Issue Type: Bug
> Components: ML, Spark Core
>Affects Versions: 3.0.3, 3.1.2, 3.2.0
>Reporter: Sai Polisetty
>Assignee: Sean R. Owen
>Priority: Minor
>
> When the Bloom filter stat function is invoked on a large dataframe that
> requires a BitArray of size >2GB, it will result in a
> {color:#55}java.lang.OutOfMemoryError{color}. As mentioned in a similar
> bug, this is due to the zero value passed to treeAggrete. Irrespective of
> spark.serializer value, this will be serialized using JavaSerializer which
> has a hard limit of 2GB. Using a solution similar to SPARK-26228 and setting
> spark.serializer to KryoSerializer can avoid this error.
>
> Steps to reproduce:
> {{val df = List.range(0, 10).toDF("Id")}}{{val expectedNumItems = 20L
> // 2 billion}}
> {{val fpp = 0.03}}
> {{val bf = df.stat.bloomFilter("Id", expectedNumItems, fpp)}}
> Stack trace:
> {color:#55}java.lang.OutOfMemoryError{color}
> {color:#55} at
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at
> org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189) at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at
> org.apache.spark.SparkContext.clean(SparkContext.scala:2604) at
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$1(PairRDDFunctions.scala:86)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:395) at
> org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:75)
> at
> org.apache.spark.rdd.PairRDDFunctions.$anonfun$foldByKey$1(PairRDDFunctions.scala:218)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:395) at
> org.apache.spark.rdd.PairRDDFunctions.foldByKey(PairRDDFunctions.scala:207)
> at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1224) at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:125)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:395) at
> org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1203) at
> org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala