[ https://issues.apache.org/jira/browse/SPARK-24910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-24910. ---------------------------------- Resolution: Incomplete > Spark Bloom Filter Closure Serialization improvement for very high volume of > Data > --------------------------------------------------------------------------------- > > Key: SPARK-24910 > URL: https://issues.apache.org/jira/browse/SPARK-24910 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 2.3.1 > Reporter: Himangshu Ranjan Borah > Priority: Minor > Labels: bulk-closed > > I am proposing an improvement to the Bloom Filter Generation logic being used > in the DataFrameStatFunctions' Bloom Filter API using mapPartitions() instead > of aggregate() to avoid closure serialization which fails for huge BitArrays. > Spark's Stat Functions' Bloom Filter Implementation uses > aggregate/treeAggregate operations which uses a closure with a dependency on > the bloom filter that is created in the driver. Since Spark hard codes the > closure serializer to Java Serializer it fails in closure cleanup for very > big sizes of Bloom Filters (Typically with num items ~ Billions and with fpp > ~ 0.001). Kryo serializer work's fine in such a scale but seems like there > were some issues using Kryo for closure serialization due to which Spark 2.0 > hardcoded it to Java. The call-stack that we get typically looks like, > {{{color:#f79232}java.lang.OutOfMemoryError{color}}} > {{{color:#f79232} at > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123){color}}} > {{{color:#f79232} at > java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117){color}}} > {{{color:#f79232} at > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93){color}}} > {{{color:#f79232} at > java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153){color}}} > {{{color:#f79232} at > org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178){color}}} > {{{color:#f79232} at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348){color}}} > {{{color:#f79232} at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43){color}}} > {{{color:#f79232} at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100){color}}} > {{{color:#f79232} at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342){color}}} > {{{color:#f79232} at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335){color}}} > {{{color:#f79232} at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159){color}}} > {{{color:#f79232} at > org.apache.spark.SparkContext.clean(SparkContext.scala:2292){color}}} > {{{color:#f79232} at > org.apache.spark.SparkContext.runJob(SparkContext.scala:2022){color}}} > {{{color:#f79232} at > org.apache.spark.SparkContext.runJob(SparkContext.scala:2124){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}} > {{{color:#f79232} at org.apache.spark.rdd.RDD.fold(RDD.scala:1086){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}} > {{{color:#f79232} at > org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131){color}}} > {{{color:#f79232} at > org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala:554){color}}} > {{{color:#f79232} at > org.apache.spark.sql.DataFrameStatFunctions.bloomFilter(DataFrameStatFunctions.scala:505){color}}} > This issue can be overcome if we *don't* use the *aggregate()* operations for > the Bloom Filter generation and use *mapPartitions()* kind of operations > where we create the Bloom Filters inside the executors (by giving them enough > memory and controlling the no. of executors and partitions) and then return > the final bloom filters per partition to the driver after which we can > aggregate just the bloom filters either in the driver itself or distributing > it using theeAggregate(). This way we can make use of the Kryo serializer > for just the returning Bloom Filter and it works fine on big datasets and is > scalable according to the cluster specifications. Please comment on this from > your end or let me know of any upcoming improvements on this issue that you > might already be working on. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org