This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cf60384 [SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe cf60384 is described below commit cf6038499d3d5686ab3377f550c2fe507d022fd3 Author: herman <her...@databricks.com> AuthorDate: Tue Apr 21 18:17:19 2020 -0700 [SPARK-31511][SQL] Make BytesToBytesMap iterators thread-safe ### What changes were proposed in this pull request? This PR increases the thread safety of the `BytesToBytesMap`: - It makes the `iterator()` and `destructiveIterator()` methods used their own `Location` object. This used to be shared, and this was causing issues when the map was being iterated over in two threads by two different iterators. - Removes the `safeIterator()` function. This is not needed anymore. - Improves the documentation of a couple of methods w.r.t. thread-safety. ### Why are the changes needed? It is unexpected an iterator shares the object it is returning with all other iterators. This is a violation of the iterator contract, and it causes issues with iterators over a map that are consumed in different threads. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests. Closes #28286 from hvanhovell/SPARK-31511. Authored-by: herman <her...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/unsafe/map/BytesToBytesMap.java | 23 +++++++++------------- .../spark/sql/execution/joins/HashedRelation.scala | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 64c240c..6e02888 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -406,17 +406,10 @@ public final class BytesToBytesMap extends MemoryConsumer { * * For efficiency, all calls to `next()` will return the same {@link Location} object. * - * If any other lookups or operations are performed on this map while iterating over it, including - * `lookup()`, the behavior of the returned iterator is undefined. + * The returned iterator is thread-safe. However if the map is modified while iterating over it, + * the behavior of the returned iterator is undefined. */ public MapIterator iterator() { - return new MapIterator(numValues, loc, false); - } - - /** - * Returns a thread safe iterator that iterates of the entries of this map. - */ - public MapIterator safeIterator() { return new MapIterator(numValues, new Location(), false); } @@ -427,19 +420,20 @@ public final class BytesToBytesMap extends MemoryConsumer { * * For efficiency, all calls to `next()` will return the same {@link Location} object. * - * If any other lookups or operations are performed on this map while iterating over it, including - * `lookup()`, the behavior of the returned iterator is undefined. + * The returned iterator is thread-safe. However if the map is modified while iterating over it, + * the behavior of the returned iterator is undefined. */ public MapIterator destructiveIterator() { updatePeakMemoryUsed(); - return new MapIterator(numValues, loc, true); + return new MapIterator(numValues, new Location(), true); } /** * Looks up a key, and return a {@link Location} handle that can be used to test existence * and read/write values. * - * This function always return the same {@link Location} instance to avoid object allocation. + * This function always returns the same {@link Location} instance to avoid object allocation. + * This function is not thread-safe. */ public Location lookup(Object keyBase, long keyOffset, int keyLength) { safeLookup(keyBase, keyOffset, keyLength, loc, @@ -451,7 +445,8 @@ public final class BytesToBytesMap extends MemoryConsumer { * Looks up a key, and return a {@link Location} handle that can be used to test existence * and read/write values. * - * This function always return the same {@link Location} instance to avoid object allocation. + * This function always returns the same {@link Location} instance to avoid object allocation. + * This function is not thread-safe. */ public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) { safeLookup(keyBase, keyOffset, keyLength, loc, hash); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 4001338..13180d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -177,7 +177,7 @@ private[joins] class UnsafeHashedRelation( } override def keys(): Iterator[InternalRow] = { - val iter = binaryMap.safeIterator() + val iter = binaryMap.iterator() new Iterator[InternalRow] { val unsafeRow = new UnsafeRow(numKeys) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org