HeartSaVioR commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843414788


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
         if (index != numValues - 1) {
           val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, 
numValues - 1)
           if (valuePairAtMaxIndex != null) {
+            // likely case where last element is non-null and we can simply 
swap with index
             keyWithIndexToValue.put(currentKey, index, 
valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
           } else {
-            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
-            logWarning(s"`keyWithIndexToValue` returns a null value for index 
${numValues - 1} " +
-              s"at current key $projectedKey.")
+            // Find the rightmost non null index and swap values with that 
index,
+            // if index returned is not the same as the passed one
+            val nonNullIndex = getRightMostNonNullIndex(index + 
1).getOrElse(index)
+            if (nonNullIndex != index) {
+              val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+              keyWithIndexToValue.put(currentKey, index, valuePair.value,
+                valuePair.matched)
+            }
+
+            // If nulls were found at the end, get the projected key and log a 
warning
+            // for the range of null indices.
+            if (nonNullIndex != numValues - 1) {
+              val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+              logWarning(s"`keyWithIndexToValue` returns a null value for 
indices " +
+                s"with range from startIndex=${nonNullIndex + 1} " +
+                s"and endIndex=${numValues - 1} " +
+                s"at currentKey=$projectedKey.")

Review Comment:
   Yeah there are two options, log or not. I don't think obfuscation (which can 
be decodable by us) would work in anyway. We are assuming attackers here, who 
are experts on figuring out how to decode in arbitrarily encoded one, and we 
expose the code how to encode. What we can do is what they can eventually do.
   
   I generally agree that logging data could be considered as a security issue. 
If we are enforced to respect the clean and stable security policy then it 
would have been very clear, but we are yet having such thing on the decision 
and it is up to the reviewers/committers.
   
   I'm OK with just logging the symptom without context on the data as of now, 
and even probably need to propose hiding it in other places as well. Probably, 
this seems to be a strong rationalization of having state data source, at least 
read functionality; you stop the query if you figure out something is broken 
(Spark will/should notice it), and be able to perform some query against the 
checkpoint to figure out whether state is already broken or not, and if it is 
broken, which rows are affected. In many cases, you may not able to correct the 
broken row into normal one, but knowing which row is broken is valuable for 
determining the impact and taking further actions.
   
   Even better if we have basic write functionality to fix the broken rows as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to