Repository: spark Updated Branches: refs/heads/master be85245a9 -> 0b903caef
[SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql core ## What changes were proposed in this pull request? The `FailureSafeParser` is only used in sql core, it doesn't make sense to put it in catalyst module. ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #17408 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b903cae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b903cae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b903cae Branch: refs/heads/master Commit: 0b903caef3183c5113feb09995874f6a07aa6698 Parents: be85245 Author: Wenchen Fan <wenc...@databricks.com> Authored: Sat Mar 25 11:46:54 2017 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Sat Mar 25 11:46:54 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/util/BadRecordException.scala | 33 ++++++++ .../sql/catalyst/util/FailureSafeParser.scala | 83 -------------------- .../org/apache/spark/sql/DataFrameReader.scala | 3 +- .../datasources/FailureSafeParser.scala | 72 +++++++++++++++++ .../datasources/csv/UnivocityParser.scala | 3 +- .../datasources/json/JsonDataSource.scala | 3 +- 6 files changed, 109 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala new file mode 100644 index 0000000..985f0dc --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.unsafe.types.UTF8String + +/** + * Exception thrown when the underlying parser meet a bad record and can't parse it. + * @param record a function to return the record that cause the parser to fail + * @param partialResult a function that returns an optional row, which is the partial result of + * parsing this bad record. + * @param cause the actual exception about why the record is bad and can't be parsed. + */ +case class BadRecordException( + record: () => UTF8String, + partialResult: () => Option[InternalRow], + cause: Throwable) extends Exception(cause) http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala deleted file mode 100644 index 725e301..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.util - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.types.StructType -import org.apache.spark.unsafe.types.UTF8String - -class FailureSafeParser[IN]( - rawParser: IN => Seq[InternalRow], - mode: ParseMode, - schema: StructType, - columnNameOfCorruptRecord: String) { - - private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) - private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) - private val resultRow = new GenericInternalRow(schema.length) - private val nullResult = new GenericInternalRow(schema.length) - - // This function takes 2 parameters: an optional partial result, and the bad record. If the given - // schema doesn't contain a field for corrupted record, we just return the partial result or a - // row with all fields null. If the given schema contains a field for corrupted record, we will - // set the bad record to this field, and set other fields according to the partial result or null. - private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { - if (corruptFieldIndex.isDefined) { - (row, badRecord) => { - var i = 0 - while (i < actualSchema.length) { - val from = actualSchema(i) - resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull - i += 1 - } - resultRow(corruptFieldIndex.get) = badRecord() - resultRow - } - } else { - (row, _) => row.getOrElse(nullResult) - } - } - - def parse(input: IN): Iterator[InternalRow] = { - try { - rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) - } catch { - case e: BadRecordException => mode match { - case PermissiveMode => - Iterator(toResultRow(e.partialResult(), e.record)) - case DropMalformedMode => - Iterator.empty - case FailFastMode => - throw e.cause - } - } - } -} - -/** - * Exception thrown when the underlying parser meet a bad record and can't parse it. - * @param record a function to return the record that cause the parser to fail - * @param partialResult a function that returns an optional row, which is the partial result of - * parsing this bad record. - * @param cause the actual exception about why the record is bad and can't be parsed. - */ -case class BadRecordException( - record: () => UTF8String, - partialResult: () => Option[InternalRow], - cause: Throwable) extends Exception(cause) http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e6d2b1b..6c23861 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -27,11 +27,10 @@ import org.apache.spark.Partition import org.apache.spark.annotation.InterfaceStability import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} -import org.apache.spark.sql.catalyst.util.FailureSafeParser import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.DDLUtils +import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser} import org.apache.spark.sql.execution.datasources.csv._ -import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.types.{StringType, StructType} http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala new file mode 100644 index 0000000..159aef2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class FailureSafeParser[IN]( + rawParser: IN => Seq[InternalRow], + mode: ParseMode, + schema: StructType, + columnNameOfCorruptRecord: String) { + + private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) + private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) + private val resultRow = new GenericInternalRow(schema.length) + private val nullResult = new GenericInternalRow(schema.length) + + // This function takes 2 parameters: an optional partial result, and the bad record. If the given + // schema doesn't contain a field for corrupted record, we just return the partial result or a + // row with all fields null. If the given schema contains a field for corrupted record, we will + // set the bad record to this field, and set other fields according to the partial result or null. + private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = { + if (corruptFieldIndex.isDefined) { + (row, badRecord) => { + var i = 0 + while (i < actualSchema.length) { + val from = actualSchema(i) + resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull + i += 1 + } + resultRow(corruptFieldIndex.get) = badRecord() + resultRow + } + } else { + (row, _) => row.getOrElse(nullResult) + } + } + + def parse(input: IN): Iterator[InternalRow] = { + try { + rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) + } catch { + case e: BadRecordException => mode match { + case PermissiveMode => + Iterator(toResultRow(e.partialResult(), e.record)) + case DropMalformedMode => + Iterator.empty + case FailFastMode => + throw e.cause + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 263f77e..c3657ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -30,7 +30,8 @@ import com.univocity.parsers.csv.CsvParser import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, FailureSafeParser} +import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.FailureSafeParser import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 51e952c..4f2963d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -33,8 +33,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} -import org.apache.spark.sql.catalyst.util.FailureSafeParser -import org.apache.spark.sql.execution.datasources.{CodecStreams, DataSource, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org