This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 83f0423  [SPARK-32148][SS] Fix stream-stream join issue on missing to 
copy reused unsafe row
83f0423 is described below

commit 83f0423ed91388518345cc4e66a552195bcf6f7a
Author: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Jul 9 07:37:06 2020 +0000

    [SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused 
unsafe row
    
    ### What changes were proposed in this pull request?
    
    This patch fixes the odd join result being occurred from stream-stream join 
for state store format V2.
    
    There're some spots on V2 path which leverage UnsafeProjection. As the 
result row is reused, the row should be copied to avoid changing value during 
reading (or make sure the caller doesn't affect by such behavior) but 
`SymmetricHashJoinStateManager.removeByValueCondition` violates the case.
    
    This patch makes `KeyWithIndexToValueRowConverterV2.convertValue` copy the 
row by itself so that callers don't need to take care about it. This patch 
doesn't change the behavior of 
`KeyWithIndexToValueRowConverterV2.convertToValueRow` to avoid double-copying, 
as the caller is expected to store the row which the implementation of state 
store will call `copy()`.
    
    This patch adds such behavior into each method doc in 
`KeyWithIndexToValueRowConverter`, so that further contributors can read 
through and make sure the change / new addition doesn't break the contract.
    
    ### Why are the changes needed?
    
    Stream-stream join with state store format V2 (newly added in Spark 3.0.0) 
has a serious correctness bug which brings indeterministic result.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, some of Spark 3.0.0 users using stream-stream join from the new 
checkpoint (as the bug exists to only v2 format path) may encounter wrong join 
result. This patch will fix it.
    
    ### How was this patch tested?
    
    Reported case is converted to the new UT, and confirmed UT passed. All UTs 
in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well
    
    Closes #28975 from HeartSaVioR/SPARK-32148.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensou...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 526cb2d1ba2b4c07e10d7011367fdef24a40a927)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../streaming/StreamingSymmetricHashJoinExec.scala |  4 ++
 .../state/SymmetricHashJoinStateManager.scala      | 17 ++++++++-
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 44 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 198e17d..57e62dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -295,6 +295,10 @@ case class StreamingSymmetricHashJoinExec(
             
postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
           }
         }
+
+        // NOTE: we need to make sure `outerOutputIter` is evaluated "after" 
exhausting all of
+        // elements in `innerOutputIter`, because evaluation of 
`innerOutputIter` may update
+        // the match flag which the logic for outer join is relying on.
         val removedRowIter = leftSideJoiner.removeOldState()
         val outerOutputIter = removedRowIter.filterNot { kv =>
           stateFormatVersion match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 1a0a43c..1a5b50d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -451,10 +451,25 @@ class SymmetricHashJoinStateManager(
   }
 
   private trait KeyWithIndexToValueRowConverter {
+    /** Defines the schema of the value row (the value side of K-V in state 
store). */
     def valueAttributes: Seq[Attribute]
 
+    /**
+     * Convert the value row to (actual value, match) pair.
+     *
+     * NOTE: implementations should ensure the result row is NOT reused during 
execution, so
+     * that caller can safely read the value in any time.
+     */
     def convertValue(value: UnsafeRow): ValueAndMatchPair
 
+    /**
+     * Build the value row from (actual value, match) pair. This is expected 
to be called just
+     * before storing to the state store.
+     *
+     * NOTE: depending on the implementation, the result row "may" be reused 
during execution
+     * (to avoid initialization of object), so the caller should ensure that 
the logic doesn't
+     * affect by such behavior. Call copy() against the result row if needed.
+     */
     def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
   }
 
@@ -493,7 +508,7 @@ class SymmetricHashJoinStateManager(
 
     override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
       if (value != null) {
-        ValueAndMatchPair(valueRowGenerator(value),
+        ValueAndMatchPair(valueRowGenerator(value).copy(),
           value.getBoolean(indexOrdinalInValueWithMatchedRow))
       } else {
         null
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 3f218c9..7837b20 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.sql.Timestamp
 import java.util.{Locale, UUID}
 
 import scala.util.Random
@@ -991,4 +992,47 @@ class StreamingOuterJoinSuite extends StreamTest with 
StateStoreMetricsTest with
       )
     }
   }
+
+  test("SPARK-32148 stream-stream join regression on Spark 3.0.0") {
+    val input1 = MemoryStream[(Timestamp, String, String)]
+    val df1 = input1.toDF
+      .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+      .withWatermark(s"eventTime", "2 minutes")
+
+    val input2 = MemoryStream[(Timestamp, String, String)]
+    val df2 = input2.toDF
+      .selectExpr("_1 as eventTime", "_2 as id", "_3 as name")
+      .withWatermark(s"eventTime", "4 minutes")
+
+    val joined = df1.as("left")
+      .join(df2.as("right"),
+        expr("""
+               |left.id = right.id AND left.eventTime BETWEEN
+               |  right.eventTime - INTERVAL 30 seconds AND
+               |  right.eventTime + INTERVAL 30 seconds
+             """.stripMargin),
+        joinType = "leftOuter")
+
+    val inputDataForInput1 = Seq(
+      (Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
+      (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
+      (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B"))
+
+    val inputDataForInput2 = Seq(
+      (Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
+      (Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"),
+      (Timestamp.valueOf("2020-01-02 02:00:00"), "abc", "C"))
+
+    val expectedOutput = Seq(
+      (Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner", 
null, null, null),
+      (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A",
+        Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
+      (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B",
+        Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"))
+
+    testStream(joined)(
+      MultiAddData((input1, inputDataForInput1), (input2, inputDataForInput2)),
+      CheckNewAnswer(expectedOutput.head, expectedOutput.tail: _*)
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to