This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 7e542f164a2f17501b153a2d7b8c53636a5bb9b8
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Fri Apr 1 18:21:48 2022 +0900

    [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with 
RocksDB state store provider
    
    (Credit to alex-balikov for the inspiration of the root cause observation, 
and anishshri-db for looking into the issue together.)
    
    This PR fixes the correctness issue on stream-stream outer join with 
RocksDB state store provider, which can occur in certain condition, like below:
    
    * stream-stream time interval outer join
      * left outer join has an issue on left side, right outer join has an 
issue on right side, full outer join has an issue on both sides
    * At batch N, produce non-late row(s) on the problematic side
    * At the same batch (batch N), some row(s) on the problematic side are 
evicted by the condition of watermark
    
    The root cause is same as 
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read 
consistency on iterator, especially with RocksDB state store provider. (Quoting 
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting 
StateStore changes made after its creation.)
    
    More specifically, if updates are performed during processing input rows 
and somehow updates the number of values for grouping key, the update is not 
seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method 
does the eviction with the number of values in out of sync.
    
    Making it more worse, if the method performs the eviction and updates the 
number of values for grouping key, it "overwrites" the number of value, 
effectively drop all rows being inserted in the same batch.
    
    Below code blocks are references on understanding the details of the issue.
    
    
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339
    
    
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627
    
    
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201
    
    
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223
    
    This PR fixes the outer iterators as late evaluation to ensure all updates 
on processing input rows are reflected "before" outer iterators are initialized.
    
    The bug is described in above section.
    
    No.
    
    New UT added.
    
    Closes #36002 from HeartSaVioR/SPARK-38684.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/StreamingSymmetricHashJoinExec.scala | 81 ++++++++++++++++------
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 63 ++++++++++++++++-
 2 files changed, 121 insertions(+), 23 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 616ae08..30d87af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -318,17 +318,22 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, 
kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" 
exhausting all of
-        // elements in `innerOutputIter`, because evaluation of 
`innerOutputIter` may update
-        // the match flag which the logic for outer join is relying on.
-        val removedRowIter = leftSideJoiner.removeOldState()
-        val outerOutputIter = removedRowIter.filterNot { kv =>
-          stateFormatVersion match {
-            case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, 
kv.value))
-            case 2 => kv.matched
-            case _ => throwBadStateFormatVersionException()
-          }
-        }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        // elements in `hashJoinOutputIter`, otherwise it may lead to out of 
sync according to
+        // the interface contract on StateStore.iterator and end up with 
correctness issue.
+        // Please refer SPARK-38684 for more details.
+        val outerOutputIter = new 
LazilyInitializingJoinedRowIterator(initIterFn)
 
         hashJoinOutputIter ++ outerOutputIter
       case RightOuter =>
@@ -338,14 +343,23 @@ case class StreamingSymmetricHashJoinExec(
             
postJoinFilter(joinedRow.withLeft(leftValue).withRight(rightKeyValue.value))
           }
         }
-        val removedRowIter = rightSideJoiner.removeOldState()
-        val outerOutputIter = removedRowIter.filterNot { kv =>
-          stateFormatVersion match {
-            case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, 
kv.value))
-            case 2 => kv.matched
-            case _ => throwBadStateFormatVersionException()
-          }
-        }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+        val initIterFn = { () =>
+          val removedRowIter = rightSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, 
kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+        }
+
+        // NOTE: we need to make sure `outerOutputIter` is evaluated "after" 
exhausting all of
+        // elements in `hashJoinOutputIter`, otherwise it may lead to out of 
sync according to
+        // the interface contract on StateStore.iterator and end up with 
correctness issue.
+        // Please refer SPARK-38684 for more details.
+        val outerOutputIter = new 
LazilyInitializingJoinedRowIterator(initIterFn)
 
         hashJoinOutputIter ++ outerOutputIter
       case FullOuter =>
@@ -354,10 +368,25 @@ case class StreamingSymmetricHashJoinExec(
             case 2 => kv.matched
             case _ => throwBadStateFormatVersionException()
           }
-        val leftSideOutputIter = leftSideJoiner.removeOldState().filterNot(
-          isKeyToValuePairMatched).map(pair => 
joinedRow.withLeft(pair.value).withRight(nullRight))
-        val rightSideOutputIter = rightSideJoiner.removeOldState().filterNot(
-          isKeyToValuePairMatched).map(pair => 
joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+        val leftSideInitIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot(isKeyToValuePairMatched)
+            .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
+        val rightSideInitIterFn = { () =>
+          val removedRowIter = rightSideJoiner.removeOldState()
+          removedRowIter.filterNot(isKeyToValuePairMatched)
+            .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+        }
+
+        // NOTE: we need to make sure both `leftSideOutputIter` and 
`rightSideOutputIter` are
+        // evaluated "after" exhausting all of elements in 
`hashJoinOutputIter`, otherwise it may
+        // lead to out of sync according to the interface contract on 
StateStore.iterator and
+        // end up with correctness issue. Please refer SPARK-38684 for more 
details.
+        val leftSideOutputIter = new 
LazilyInitializingJoinedRowIterator(leftSideInitIterFn)
+        val rightSideOutputIter = new 
LazilyInitializingJoinedRowIterator(rightSideInitIterFn)
 
         hashJoinOutputIter ++ leftSideOutputIter ++ rightSideOutputIter
       case _ => throwBadJoinTypeException()
@@ -632,4 +661,12 @@ case class StreamingSymmetricHashJoinExec(
   override protected def withNewChildrenInternal(
       newLeft: SparkPlan, newRight: SparkPlan): StreamingSymmetricHashJoinExec 
=
     copy(left = newLeft, right = newRight)
+
+  private class LazilyInitializingJoinedRowIterator(
+      initFn: () => Iterator[JoinedRow]) extends Iterator[JoinedRow] {
+    private lazy val iter: Iterator[JoinedRow] = initFn()
+
+    override def hasNext: Boolean = iter.hasNext
+    override def next(): JoinedRow = iter.next()
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 5ec47bb..e65c791 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -32,7 +32,7 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec, 
StreamingSymmetricHashJoinHelper}
-import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreProviderId}
+import 
org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, 
StateStore, StateStoreProviderId}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.Utils
@@ -1168,6 +1168,67 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite 
{
       CheckNewAnswer(expectedOutput.head, expectedOutput.tail: _*)
     )
   }
+
+  test("SPARK-38684: outer join works correctly even if processing input rows 
and " +
+    "evicting state rows for same grouping key happens in the same 
micro-batch") {
+
+    // The test is to demonstrate the correctness issue in outer join before 
SPARK-38684.
+    withSQLConf(
+      SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> "false",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+
+      val input1 = MemoryStream[(Timestamp, String, String)]
+      val df1 = input1.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val input2 = MemoryStream[(Timestamp, String, String)]
+      val df2 = input2.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val joined = df1.as("left")
+        .join(df2.as("right"),
+          expr("""
+                 |left.id = right.id AND left.eventTime BETWEEN
+                 |  right.eventTime - INTERVAL 30 seconds AND
+                 |  right.eventTime + INTERVAL 30 seconds
+             """.stripMargin),
+          joinType = "leftOuter")
+
+      testStream(joined)(
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left 
in batch 1"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 00:01:00"), "abc", 
"right in batch 1")))
+        ),
+        CheckNewAnswer(),
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left 
in batch 2"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 01:01:00"), "abc", 
"right in batch 2")))
+        ),
+        // watermark advanced to "2020-01-02 00:00:00"
+        CheckNewAnswer(),
+        AddData(input1, (Timestamp.valueOf("2020-01-02 01:30:00"), "abc", 
"left in batch 3")),
+        // watermark advanced to "2020-01-02 01:00:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1", 
null, null, null)
+        ),
+        // left side state should still contain "left in batch 2" and "left in 
batch 3"
+        // we should see both rows in the left side since
+        // - "left in batch 2" is going to be evicted in this batch
+        // - "left in batch 3" is going to be matched with new row in right 
side
+        AddData(input2,
+          (Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left 
in batch 3")),
+        // watermark advanced to "2020-01-02 01:01:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2",
+            null, null, null),
+          (Timestamp.valueOf("2020-01-02 01:30:00"), "abc", "left in batch 3",
+            Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left 
in batch 3")
+        )
+      )
+    }
+  }
 }
 
 class StreamingFullOuterJoinSuite extends StreamingJoinSuite {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to