This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3e5407dbbfb [SPARK-38809][SS] Implement option to skip null values in 
symmetric hash implementation of stream-stream joins
3e5407dbbfb is described below

commit 3e5407dbbfb8ea955e9c44df1893ad24a9449a28
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Fri Apr 8 12:19:50 2022 +0900

    [SPARK-38809][SS] Implement option to skip null values in symmetric hash 
implementation of stream-stream joins
    
    ### What changes were proposed in this pull request?
    
    In the symmetric has join state manager, we can receive entries with null 
values for a key and that can cause the `removeByValue` and get iterators to 
fail and run into the NullPointerException. This is possible if the state 
recovered is written from an old spark version or its corrupted on disk or due 
to issues with the iterators. Since we don't have a utility to query this 
state, we would like to provide a conf option to skip nulls for the symmetric 
hash implementation in stream str [...]
    
    ### Why are the changes needed?
    
    Without these changes, if we encounter null values for stream-stream joins, 
the executor task will repeatedly fail with NullPointerException and will 
terminate the stage and eventually the query as well. This change allows the 
user to set a config option to continue iterating by skipping null values for 
symmetric hash based implementation of stream-stream joins.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit tests to test the new functionality by adding nulls in between 
and forcing the iteration/get calls with nulls in the mix and tested the 
behavior with the config disabled as well as enabled.
    Sample output:
    ```
    [info] SymmetricHashJoinStateManagerSuite:
    15:07:50.627 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
    [info] - StreamingJoinStateManager V1 - all operations (588 milliseconds)
    [info] - StreamingJoinStateManager V2 - all operations (251 milliseconds)
    15:07:52.669 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=4.
    15:07:52.671 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=3.
    15:07:52.672 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=1 and endIndex=3.
    15:07:52.672 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=1 and endIndex=1.
    [info] - StreamingJoinStateManager V1 - all operations with nulls (252 
milliseconds)
    15:07:52.896 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=4.
    15:07:52.897 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=3.
    15:07:52.898 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=1 and endIndex=3.
    15:07:52.898 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=1 and endIndex=1.
    [info] - StreamingJoinStateManager V2 - all operations with nulls (221 
milliseconds)
    15:07:53.114 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=5 and endIndex=6.
    15:07:53.116 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=6.
    15:07:53.331 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=5 and endIndex=6.
    15:07:53.331 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=3.
    [info] - StreamingJoinStateManager V1 - all operations with nulls in middle 
(435 milliseconds)
    15:07:53.549 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=5 and endIndex=6.
    15:07:53.551 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=6.
    15:07:53.785 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=5 and endIndex=6.
    15:07:53.785 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=3.
    [info] - StreamingJoinStateManager V2 - all operations with nulls in middle 
(456 milliseconds)
    [info] - SPARK-35689: StreamingJoinStateManager V1 - printable key of 
keyWithIndexToValue (390 milliseconds)
    [info] - SPARK-35689: StreamingJoinStateManager V2 - printable key of 
keyWithIndexToValue (216 milliseconds)
    15:07:54.640 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite:
    
    ===== POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite, 
threads: rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) =====
    [info] Run completed in 5 seconds, 714 milliseconds.
    [info] Total number of tests run: 8
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 8, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    Closes #36090 from anishshri-db/bfix/SPARK-38809.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 61c489ea7ef51d7d0217f770ec358ed7a7b76b42)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 16 ++++
 .../execution/streaming/state/StateStoreConf.scala |  3 +
 .../state/SymmetricHashJoinStateManager.scala      | 38 ++++++---
 .../state/SymmetricHashJoinStateManagerSuite.scala | 91 ++++++++++++++++++----
 4 files changed, 123 insertions(+), 25 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index fbd56968c1d..9b7d4aee745 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1897,6 +1897,19 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  /**
+   * SPARK-38809 - Config option to allow skipping null values for hash based 
stream-stream joins.
+   * Its possible for us to see nulls if state was written with an older 
version of Spark,
+   * the state was corrupted on disk or if we had an issue with the state 
iterators.
+   */
+  val STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS =
+  
buildConf("spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled")
+    .internal()
+    .doc("When true, this config will skip null values in hash based 
stream-stream joins.")
+    .version("3.3.0")
+    .booleanConf
+    .createWithDefault(false)
+
   val VARIABLE_SUBSTITUTE_ENABLED =
     buildConf("spark.sql.variable.substitute")
       .doc("This enables substitution using syntax like `${var}`, 
`${system:var}`, " +
@@ -3866,6 +3879,9 @@ class SQLConf extends Serializable with Logging {
 
   def stateStoreFormatValidationEnabled: Boolean = 
getConf(STATE_STORE_FORMAT_VALIDATION_ENABLED)
 
+  def stateStoreSkipNullsForStreamStreamJoins: Boolean =
+    getConf(STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS)
+
   def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
 
   def isUnsupportedOperationCheckEnabled: Boolean = 
getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 58af8272d1c..529db2609cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -52,6 +52,9 @@ class StateStoreConf(
   val formatValidationCheckValue: Boolean =
     
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, 
"true") == "true"
 
+  /** Whether to skip null values for hash based stream-stream joins. */
+  val skipNullsForStreamStreamJoins: Boolean = 
sqlConf.stateStoreSkipNullsForStreamStreamJoins
+
   /** The compression codec used to compress delta and snapshot files. */
   val compressionCodec: String = sqlConf.stateStoreCompressionCodec
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 56c47d564a3..d17c6e8e862 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -222,8 +222,12 @@ class SymmetricHashJoinStateManager(
         valueRemoved = false
       }
 
-      // Find the next value satisfying the condition, updating `currentKey` 
and `numValues` if
-      // needed. Returns null when no value can be found.
+      /**
+       * Find the next value satisfying the condition, updating `currentKey` 
and `numValues` if
+       * needed. Returns null when no value can be found.
+       * Note that we will skip nulls explicitly if config setting for the 
same is
+       * set to true via STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.
+       */
       private def findNextValueForIndex(): ValueAndMatchPair = {
         // Loop across all values for the current key, and then all other 
keys, until we find a
         // value satisfying the removal condition.
@@ -233,7 +237,9 @@ class SymmetricHashJoinStateManager(
           if (hasMoreValuesForCurrentKey) {
             // First search the values for the current key.
             val valuePair = keyWithIndexToValue.get(currentKey, index)
-            if (removalCondition(valuePair.value)) {
+            if (valuePair == null && storeConf.skipNullsForStreamStreamJoins) {
+              index += 1
+            } else if (removalCondition(valuePair.value)) {
               return valuePair
             } else {
               index += 1
@@ -597,22 +603,30 @@ class SymmetricHashJoinStateManager(
     /**
      * Get all values and indices for the provided key.
      * Should not return null.
+     * Note that we will skip nulls explicitly if config setting for the same 
is
+     * set to true via STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.
      */
     def getAll(key: UnsafeRow, numValues: Long): 
Iterator[KeyWithIndexAndValue] = {
-      val keyWithIndexAndValue = new KeyWithIndexAndValue()
-      var index = 0
       new NextIterator[KeyWithIndexAndValue] {
+        private val keyWithIndexAndValue = new KeyWithIndexAndValue()
+        private var index: Long = 0L
+
+        private def hasMoreValues = index < numValues
         override protected def getNext(): KeyWithIndexAndValue = {
-          if (index >= numValues) {
-            finished = true
-            null
-          } else {
+          while (hasMoreValues) {
             val keyWithIndex = keyWithIndexRow(key, index)
             val valuePair = 
valueRowConverter.convertValue(stateStore.get(keyWithIndex))
-            keyWithIndexAndValue.withNew(key, index, valuePair)
-            index += 1
-            keyWithIndexAndValue
+            if (valuePair == null && storeConf.skipNullsForStreamStreamJoins) {
+              index += 1
+            } else {
+              keyWithIndexAndValue.withNew(key, index, valuePair)
+              index += 1
+              return keyWithIndexAndValue
+            }
           }
+
+          finished = true
+          return null
         }
 
         override protected def close(): Unit = {}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index deeebe1fc42..30d39ebcc4a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -52,6 +53,12 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest 
with BeforeAndAfter
     }
   }
 
+  SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
+    test(s"StreamingJoinStateManager V${version} - all operations with nulls 
in middle") {
+      testAllOperationsWithNullsInMiddle(version)
+    }
+  }
+
   SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
     test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
         "printable key of keyWithIndexToValue") {
@@ -167,6 +174,55 @@ class SymmetricHashJoinStateManagerSuite extends 
StreamTest with BeforeAndAfter
     }
   }
 
+  /* Test removeByValue with nulls in middle simulated by updating numValues 
on the state manager */
+  private def testAllOperationsWithNullsInMiddle(stateFormatVersion: Int): 
Unit = {
+    // Test with skipNullsForStreamStreamJoins set to false which would throw a
+    // NullPointerException while iterating and also return null values as 
part of get
+    withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) 
{ manager =>
+      implicit val mgr = manager
+
+      val ex = intercept[Exception] {
+        appendAndTest(40, 50, 200, 300)
+        assert(numRows === 3)
+        updateNumValues(40, 4) // create a null at the end
+        append(40, 400)
+        updateNumValues(40, 7) // create nulls in between and end
+        removeByValue(50)
+      }
+      assert(ex.isInstanceOf[NullPointerException])
+      assert(getNumValues(40) === 7)        // we should get 7 with no nulls 
skipped
+
+      removeByValue(300)
+      assert(getNumValues(40) === 1)         // only 400 should remain
+      assert(get(40) === Seq(400))
+      removeByValue(400)
+      assert(get(40) === Seq.empty)
+      assert(numRows === 0)                        // ensure all elements 
removed
+    }
+
+    // Test with skipNullsForStreamStreamJoins set to true which would skip 
nulls
+    // and continue iterating as part of removeByValue as well as get
+    withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion, 
true) { manager =>
+      implicit val mgr = manager
+
+      appendAndTest(40, 50, 200, 300)
+      assert(numRows === 3)
+      updateNumValues(40, 4) // create a null at the end
+      append(40, 400)
+      updateNumValues(40, 7) // create nulls in between and end
+
+      removeByValue(50)
+      assert(getNumValues(40) === 3)       // we should now get (400, 200, 
300) with nulls skipped
+
+      removeByValue(300)
+      assert(getNumValues(40) === 1)         // only 400 should remain
+      assert(get(40) === Seq(400))
+      removeByValue(400)
+      assert(get(40) === Seq.empty)
+      assert(numRows === 0)                        // ensure all elements 
removed
+    }
+  }
+
   val watermarkMetadata = new 
MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build()
   val inputValueSchema = new StructType()
     .add(StructField("time", IntegerType, metadata = watermarkMetadata))
@@ -205,6 +261,11 @@ class SymmetricHashJoinStateManagerSuite extends 
StreamTest with BeforeAndAfter
     manager.updateNumValuesTestOnly(toJoinKeyRow(key), numValues)
   }
 
+  def getNumValues(key: Int)
+                  (implicit manager: SymmetricHashJoinStateManager): Int = {
+    manager.get(toJoinKeyRow(key)).size
+  }
+
   def get(key: Int)(implicit manager: SymmetricHashJoinStateManager): Seq[Int] 
= {
     manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted
   }
@@ -232,22 +293,26 @@ class SymmetricHashJoinStateManagerSuite extends 
StreamTest with BeforeAndAfter
     manager.metrics.numKeys
   }
 
-
   def withJoinStateManager(
-    inputValueAttribs: Seq[Attribute],
-    joinKeyExprs: Seq[Expression],
-    stateFormatVersion: Int)(f: SymmetricHashJoinStateManager => Unit): Unit = 
{
+      inputValueAttribs: Seq[Attribute],
+      joinKeyExprs: Seq[Expression],
+      stateFormatVersion: Int,
+      skipNullsForStreamStreamJoins: Boolean = false)
+      (f: SymmetricHashJoinStateManager => Unit): Unit = {
 
     withTempDir { file =>
-      val storeConf = new StateStoreConf()
-      val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath, 
UUID.randomUUID, 0, 0, 5)
-      val manager = new SymmetricHashJoinStateManager(
-        LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo), storeConf, 
new Configuration,
-        partitionId = 0, stateFormatVersion)
-      try {
-        f(manager)
-      } finally {
-        manager.abortIfNeeded()
+      withSQLConf(SQLConf.STATE_STORE_SKIP_NULLS_FOR_STREAM_STREAM_JOINS.key ->
+        skipNullsForStreamStreamJoins.toString) {
+        val storeConf = new StateStoreConf(spark.sqlContext.conf)
+        val stateInfo = StatefulOperatorStateInfo(file.getAbsolutePath, 
UUID.randomUUID, 0, 0, 5)
+        val manager = new SymmetricHashJoinStateManager(
+          LeftSide, inputValueAttribs, joinKeyExprs, Some(stateInfo), 
storeConf, new Configuration,
+          partitionId = 0, stateFormatVersion)
+        try {
+          f(manager)
+        } finally {
+          manager.abortIfNeeded()
+        }
       }
     }
     StateStore.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to