[ 
https://issues.apache.org/jira/browse/SPARK-24910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24910.
----------------------------------
    Resolution: Incomplete

> Spark Bloom Filter Closure Serialization improvement for very high volume of 
> Data
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-24910
>                 URL: https://issues.apache.org/jira/browse/SPARK-24910
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.3.1
>            Reporter: Himangshu Ranjan Borah
>            Priority: Minor
>              Labels: bulk-closed
>
> I am proposing an improvement to the Bloom Filter Generation logic being used 
> in the DataFrameStatFunctions' Bloom Filter API using mapPartitions() instead 
> of aggregate() to avoid closure serialization which fails for huge BitArrays.
> Spark's Stat Functions' Bloom Filter Implementation uses 
> aggregate/treeAggregate operations which uses a closure with a dependency on 
> the bloom filter that is created in the driver. Since Spark hard codes the 
> closure serializer to Java Serializer it fails in closure cleanup for very 
> big sizes of Bloom Filters (Typically with num items ~ Billions and with fpp 
> ~ 0.001). Kryo serializer work's fine in such a scale but seems like there 
> were some issues using Kryo for closure serialization due to which Spark 2.0 
> hardcoded it to Java. The call-stack that we get typically looks like,
> {{{color:#f79232}java.lang.OutOfMemoryError{color}}}
> {{{color:#f79232} at 
> java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123){color}}}
> {{{color:#f79232} at 
> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117){color}}}
> {{{color:#f79232} at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93){color}}}
> {{{color:#f79232} at 
> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153){color}}}
> {{{color:#f79232} at 
> org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178){color}}}
> {{{color:#f79232} at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348){color}}}
> {{{color:#f79232} at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43){color}}}
> {{{color:#f79232} at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100){color}}}
> {{{color:#f79232} at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342){color}}}
> {{{color:#f79232} at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335){color}}}
> {{{color:#f79232} at 
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159){color}}}
> {{{color:#f79232} at 
> org.apache.spark.SparkContext.clean(SparkContext.scala:2292){color}}}
> {{{color:#f79232} at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2022){color}}}
> {{{color:#f79232} at 
> org.apache.spark.SparkContext.runJob(SparkContext.scala:2124){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
> {{{color:#f79232} at org.apache.spark.rdd.RDD.fold(RDD.scala:1086){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
> {{{color:#f79232} at 
> org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131){color}}}
> {{{color:#f79232} at 
> org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala:554){color}}}
> {{{color:#f79232} at 
> org.apache.spark.sql.DataFrameStatFunctions.bloomFilter(DataFrameStatFunctions.scala:505){color}}}
> This issue can be overcome if we *don't* use the *aggregate()* operations for 
> the Bloom Filter generation and use *mapPartitions()* kind of operations 
> where we create the Bloom Filters inside the executors (by giving them enough 
> memory and controlling the no. of executors and partitions) and then return 
> the final bloom filters per partition to the driver after which we can 
> aggregate just the bloom filters either in the driver itself or distributing 
> it using  theeAggregate(). This way we can make use of the Kryo serializer 
> for just the returning Bloom Filter and it works fine on big datasets and is 
> scalable according to the cluster specifications. Please comment on this from 
> your end or let me know of any upcoming improvements on this issue that you 
> might already be working on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to