HyukjinKwon commented on a change in pull request #27239: [SPARK-30530][SQL] Fix filter pushdown for bad CSV records URL: https://github.com/apache/spark/pull/27239#discussion_r368380662
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala ########## @@ -230,64 +230,55 @@ class UnivocityParser( () => getCurrentInput, () => None, new RuntimeException("Malformed CSV record")) - } else if (tokens.length != parsedSchema.length) { + } + + var checkedTokens = tokens + var badRecordException: Option[Throwable] = None + + if (tokens.length != parsedSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens, by adding extra null tokens in // the tail if the number is smaller, or by dropping extra tokens if the number is larger. - val checkedTokens = if (parsedSchema.length > tokens.length) { + checkedTokens = if (parsedSchema.length > tokens.length) { tokens ++ new Array[String](parsedSchema.length - tokens.length) } else { tokens.take(parsedSchema.length) } - def getPartialResult(): Option[InternalRow] = { - try { - convert(checkedTokens).headOption - } catch { - case _: BadRecordException => None - } - } - // For records with less or more tokens than the schema, tries to return partial results - // if possible. - throw BadRecordException( - () => getCurrentInput, - () => getPartialResult(), - new RuntimeException("Malformed CSV record")) - } else { - // When the length of the returned tokens is identical to the length of the parsed schema, - // we just need to: - // 1. Convert the tokens that correspond to the required schema. - // 2. Apply the pushdown filters to `requiredRow`. - var i = 0 - val row = requiredRow.head - var skipRow = false - var badRecordException: Option[Throwable] = None - while (i < requiredSchema.length) { - try { - if (!skipRow) { - row(i) = valueConverters(i).apply(getToken(tokens, i)) - if (csvFilters.skipRow(row, i)) { - skipRow = true - } - } - if (skipRow) { - row.setNullAt(i) + badRecordException = Some(new RuntimeException("Malformed CSV record")) + } + // When the length of the returned tokens is identical to the length of the parsed schema, + // we just need to: + // 1. Convert the tokens that correspond to the required schema. + // 2. Apply the pushdown filters to `requiredRow`. + var i = 0 + val row = requiredRow.head + var skipRow = false + while (i < requiredSchema.length) { + try { + if (!skipRow) { + row(i) = valueConverters(i).apply(getToken(tokens, i)) + if (csvFilters.skipRow(row, i)) { + skipRow = true } - } catch { - case NonFatal(e) => - badRecordException = badRecordException.orElse(Some(e)) - row.setNullAt(i) } - i += 1 + if (skipRow) { + row.setNullAt(i) + } + } catch { + case NonFatal(e) => Review comment: Previously we rely on `null`s already exiting in the array. Now we rely on `java.lang.ArrayIndexOutOfBoundsException`. I don't particularly like this approach .. but I'm good as it does simplify the codes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org