This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fd7acd32895 [SPARK-44238][CORE][SQL] Introduce a new `readFrom` method 
with byte array input for `BloomFilter`
fd7acd32895 is described below

commit fd7acd32895ed79094ff75aef8ff133966627ee4
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Thu Aug 31 23:19:41 2023 -0500

    [SPARK-44238][CORE][SQL] Introduce a new `readFrom` method with byte array 
input for `BloomFilter`
    
    ### What changes were proposed in this pull request?
    This pr introduce a new `readFrom` method with byte array input for 
`BloomFilter`
    
    ### Why are the changes needed?
    De-duplicate code
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    Closes #41781 from LuciferYang/bloomfilter-readFrom.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../main/java/org/apache/spark/util/sketch/BloomFilter.java    |  7 +++++++
 .../java/org/apache/spark/util/sketch/BloomFilterImpl.java     |  6 ++++++
 .../scala/org/apache/spark/util/sketch/BloomFilterSuite.scala  |  6 ++----
 .../sql/catalyst/expressions/BloomFilterMightContain.scala     | 10 +---------
 .../catalyst/expressions/aggregate/BloomFilterAggregate.scala  |  8 +-------
 5 files changed, 17 insertions(+), 20 deletions(-)

diff --git 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
index f3c2b05e7af..172b394689c 100644
--- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
+++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java
@@ -178,6 +178,13 @@ public abstract class BloomFilter {
     return BloomFilterImpl.readFrom(in);
   }
 
+  /**
+   * Reads in a {@link BloomFilter} from a byte array.
+   */
+  public static BloomFilter readFrom(byte[] bytes) throws IOException {
+    return BloomFilterImpl.readFrom(bytes);
+  }
+
   /**
    * Computes the optimal k (number of hashes per item inserted in Bloom 
filter), given the
    * expected insertions and total number of bits in the Bloom filter.
diff --git 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
index ccf1833af99..3fba5e33252 100644
--- 
a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
+++ 
b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java
@@ -266,6 +266,12 @@ class BloomFilterImpl extends BloomFilter implements 
Serializable {
     return filter;
   }
 
+  public static BloomFilterImpl readFrom(byte[] bytes) throws IOException {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
+      return readFrom(bis);
+    }
+  }
+
   private void writeObject(ObjectOutputStream out) throws IOException {
     writeTo(out);
   }
diff --git 
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
 
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
index cfdc9954772..4d0ba66637b 100644
--- 
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
+++ 
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.sketch
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.io.ByteArrayOutputStream
 
 import scala.reflect.ClassTag
 import scala.util.Random
@@ -34,9 +34,7 @@ class BloomFilterSuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
     filter.writeTo(out)
     out.close()
 
-    val in = new ByteArrayInputStream(out.toByteArray)
-    val deserialized = BloomFilter.readFrom(in)
-    in.close()
+    val deserialized = BloomFilter.readFrom(out.toByteArray)
 
     assert(filter == deserialized)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
index b2273b6a6d1..784bea899c4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import java.io.ByteArrayInputStream
-
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -119,11 +117,5 @@ case class BloomFilterMightContain(
     }
   }
 
-  final def deserialize(bytes: Array[Byte]): BloomFilter = {
-    val in = new ByteArrayInputStream(bytes)
-    val bloomFilter = BloomFilter.readFrom(in)
-    in.close()
-    bloomFilter
-  }
-
+  final def deserialize(bytes: Array[Byte]): BloomFilter = 
BloomFilter.readFrom(bytes)
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
index 7cba462ce2c..424e191a0c9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
-import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -227,12 +226,7 @@ object BloomFilterAggregate {
     out.toByteArray
   }
 
-  final def deserialize(bytes: Array[Byte]): BloomFilter = {
-    val in = new ByteArrayInputStream(bytes)
-    val bloomFilter = BloomFilter.readFrom(in)
-    in.close()
-    bloomFilter
-  }
+  final def deserialize(bytes: Array[Byte]): BloomFilter = 
BloomFilter.readFrom(bytes)
 }
 
 private trait BloomFilterUpdater {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to