anishshri-db commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843317644


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
         if (index != numValues - 1) {
           val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, 
numValues - 1)
           if (valuePairAtMaxIndex != null) {
+            // likely case where last element is non-null and we can simply 
swap with index
             keyWithIndexToValue.put(currentKey, index, 
valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
           } else {
-            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
-            logWarning(s"`keyWithIndexToValue` returns a null value for index 
${numValues - 1} " +
-              s"at current key $projectedKey.")
+            // Find the rightmost non null index and swap values with that 
index,
+            // if index returned is not the same as the passed one
+            val nonNullIndex = getRightMostNonNullIndex(index + 
1).getOrElse(index)
+            if (nonNullIndex != index) {
+              val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+              keyWithIndexToValue.put(currentKey, index, valuePair.value,
+                valuePair.matched)
+            }
+
+            // If nulls were found at the end, get the projected key and log a 
warning
+            // for the range of null indices.
+            if (nonNullIndex != numValues - 1) {
+              val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+              logWarning(s"`keyWithIndexToValue` returns a null value for 
indices " +
+                s"with range from startIndex=${nonNullIndex + 1} " +
+                s"and endIndex=${numValues - 1} " +
+                s"at currentKey=$projectedKey.")

Review Comment:
   As discussed, we are only logging the projected join keys along with the 
range of null value indices. The newly added portion is only the indices. We 
were logging the projected keys previously too, so no change there:
   
   Here is an example:
   ```
   15:31:47.158 WARN 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager: 
`keyWithIndexToValue` returns a null value for indices with range from 
startIndex=3 and endIndex=4 at currentKey=[false,40,10.0].`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to