This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fd7acd32895 [SPARK-44238][CORE][SQL] Introduce a new `readFrom` method with byte array input for `BloomFilter` fd7acd32895 is described below commit fd7acd32895ed79094ff75aef8ff133966627ee4 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Thu Aug 31 23:19:41 2023 -0500 [SPARK-44238][CORE][SQL] Introduce a new `readFrom` method with byte array input for `BloomFilter` ### What changes were proposed in this pull request? This pr introduce a new `readFrom` method with byte array input for `BloomFilter` ### Why are the changes needed? De-duplicate code ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #41781 from LuciferYang/bloomfilter-readFrom. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../main/java/org/apache/spark/util/sketch/BloomFilter.java | 7 +++++++ .../java/org/apache/spark/util/sketch/BloomFilterImpl.java | 6 ++++++ .../scala/org/apache/spark/util/sketch/BloomFilterSuite.scala | 6 ++---- .../sql/catalyst/expressions/BloomFilterMightContain.scala | 10 +--------- .../catalyst/expressions/aggregate/BloomFilterAggregate.scala | 8 +------- 5 files changed, 17 insertions(+), 20 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index f3c2b05e7af..172b394689c 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -178,6 +178,13 @@ public abstract class BloomFilter { return BloomFilterImpl.readFrom(in); } + /** + * Reads in a {@link BloomFilter} from a byte array. + */ + public static BloomFilter readFrom(byte[] bytes) throws IOException { + return BloomFilterImpl.readFrom(bytes); + } + /** * Computes the optimal k (number of hashes per item inserted in Bloom filter), given the * expected insertions and total number of bits in the Bloom filter. diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index ccf1833af99..3fba5e33252 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -266,6 +266,12 @@ class BloomFilterImpl extends BloomFilter implements Serializable { return filter; } + public static BloomFilterImpl readFrom(byte[] bytes) throws IOException { + try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) { + return readFrom(bis); + } + } + private void writeObject(ObjectOutputStream out) throws IOException { writeTo(out); } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala index cfdc9954772..4d0ba66637b 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.sketch -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.io.ByteArrayOutputStream import scala.reflect.ClassTag import scala.util.Random @@ -34,9 +34,7 @@ class BloomFilterSuite extends AnyFunSuite { // scalastyle:ignore funsuite filter.writeTo(out) out.close() - val in = new ByteArrayInputStream(out.toByteArray) - val deserialized = BloomFilter.readFrom(in) - in.close() + val deserialized = BloomFilter.readFrom(out.toByteArray) assert(filter == deserialized) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala index b2273b6a6d1..784bea899c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BloomFilterMightContain.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.expressions -import java.io.ByteArrayInputStream - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch @@ -119,11 +117,5 @@ case class BloomFilterMightContain( } } - final def deserialize(bytes: Array[Byte]): BloomFilter = { - val in = new ByteArrayInputStream(bytes) - val bloomFilter = BloomFilter.readFrom(in) - in.close() - bloomFilter - } - + final def deserialize(bytes: Array[Byte]): BloomFilter = BloomFilter.readFrom(bytes) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala index 7cba462ce2c..424e191a0c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.expressions.aggregate -import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import org.apache.spark.sql.catalyst.InternalRow @@ -227,12 +226,7 @@ object BloomFilterAggregate { out.toByteArray } - final def deserialize(bytes: Array[Byte]): BloomFilter = { - val in = new ByteArrayInputStream(bytes) - val bloomFilter = BloomFilter.readFrom(in) - in.close() - bloomFilter - } + final def deserialize(bytes: Array[Byte]): BloomFilter = BloomFilter.readFrom(bytes) } private trait BloomFilterUpdater { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org