HeartSaVioR commented on code in PR #36073: URL: https://github.com/apache/spark/pull/36073#discussion_r843414788
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala: ########## @@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager( if (index != numValues - 1) { val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, numValues - 1) if (valuePairAtMaxIndex != null) { + // likely case where last element is non-null and we can simply swap with index keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value, valuePairAtMaxIndex.matched) } else { - val projectedKey = getInternalRowOfKeyWithIndex(currentKey) - logWarning(s"`keyWithIndexToValue` returns a null value for index ${numValues - 1} " + - s"at current key $projectedKey.") + // Find the rightmost non null index and swap values with that index, + // if index returned is not the same as the passed one + val nonNullIndex = getRightMostNonNullIndex(index + 1).getOrElse(index) + if (nonNullIndex != index) { + val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex) + keyWithIndexToValue.put(currentKey, index, valuePair.value, + valuePair.matched) + } + + // If nulls were found at the end, get the projected key and log a warning + // for the range of null indices. + if (nonNullIndex != numValues - 1) { + val projectedKey = getInternalRowOfKeyWithIndex(currentKey) + logWarning(s"`keyWithIndexToValue` returns a null value for indices " + + s"with range from startIndex=${nonNullIndex + 1} " + + s"and endIndex=${numValues - 1} " + + s"at currentKey=$projectedKey.") Review Comment: Yeah there are two options, log or not. I don't think obfuscation (which can be decodable by us) would work in anyway. We are assuming attackers here, who are experts on figuring out how to decode in arbitrarily encoded one, and we expose the code how to encode. What we can do is what they can eventually do. I generally agree that logging data could be considered as a security issue. If we are enforced to respect the clean and stable security policy then it would have been very clear, but we are yet having such thing on the decision and it is up to the reviewers/committers. I'm OK with just logging the symptom without context on the data as of now, and even probably need to propose hiding it in other places as well. Probably, this seems to be a strong rationalization of having state data source, at least read functionality; you stop the query if you figure out something is broken (Spark will/should notice it), and be able to perform some query against the checkpoint to figure out whether state is already broken or not, and if it is broken, which rows are affected. In many cases, you may not able to correct the broken row into normal one, but knowing which row is broken is valuable for determining the impact and taking further actions. Even better if we have basic write functionality to fix the broken rows as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org