Repository: spark
Updated Branches:
  refs/heads/master be85245a9 -> 0b903caef


[SPARK-19949][SQL][FOLLOW-UP] move FailureSafeParser from catalyst to sql core

## What changes were proposed in this pull request?

The `FailureSafeParser` is only used in sql core, it doesn't make sense to put 
it in catalyst module.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #17408 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b903cae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b903cae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b903cae

Branch: refs/heads/master
Commit: 0b903caef3183c5113feb09995874f6a07aa6698
Parents: be85245
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Sat Mar 25 11:46:54 2017 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Sat Mar 25 11:46:54 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/util/BadRecordException.scala  | 33 ++++++++
 .../sql/catalyst/util/FailureSafeParser.scala   | 83 --------------------
 .../org/apache/spark/sql/DataFrameReader.scala  |  3 +-
 .../datasources/FailureSafeParser.scala         | 72 +++++++++++++++++
 .../datasources/csv/UnivocityParser.scala       |  3 +-
 .../datasources/json/JsonDataSource.scala       |  3 +-
 6 files changed, 109 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
new file mode 100644
index 0000000..985f0dc
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
+ * @param record a function to return the record that cause the parser to fail
+ * @param partialResult a function that returns an optional row, which is the 
partial result of
+ *                      parsing this bad record.
+ * @param cause the actual exception about why the record is bad and can't be 
parsed.
+ */
+case class BadRecordException(
+    record: () => UTF8String,
+    partialResult: () => Option[InternalRow],
+    cause: Throwable) extends Exception(cause)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
deleted file mode 100644
index 725e301..0000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.util
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.unsafe.types.UTF8String
-
-class FailureSafeParser[IN](
-    rawParser: IN => Seq[InternalRow],
-    mode: ParseMode,
-    schema: StructType,
-    columnNameOfCorruptRecord: String) {
-
-  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
-  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
-  private val resultRow = new GenericInternalRow(schema.length)
-  private val nullResult = new GenericInternalRow(schema.length)
-
-  // This function takes 2 parameters: an optional partial result, and the bad 
record. If the given
-  // schema doesn't contain a field for corrupted record, we just return the 
partial result or a
-  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
-  // set the bad record to this field, and set other fields according to the 
partial result or null.
-  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
-    if (corruptFieldIndex.isDefined) {
-      (row, badRecord) => {
-        var i = 0
-        while (i < actualSchema.length) {
-          val from = actualSchema(i)
-          resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, 
from.dataType)).orNull
-          i += 1
-        }
-        resultRow(corruptFieldIndex.get) = badRecord()
-        resultRow
-      }
-    } else {
-      (row, _) => row.getOrElse(nullResult)
-    }
-  }
-
-  def parse(input: IN): Iterator[InternalRow] = {
-    try {
-      rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
-    } catch {
-      case e: BadRecordException => mode match {
-        case PermissiveMode =>
-          Iterator(toResultRow(e.partialResult(), e.record))
-        case DropMalformedMode =>
-          Iterator.empty
-        case FailFastMode =>
-          throw e.cause
-      }
-    }
-  }
-}
-
-/**
- * Exception thrown when the underlying parser meet a bad record and can't 
parse it.
- * @param record a function to return the record that cause the parser to fail
- * @param partialResult a function that returns an optional row, which is the 
partial result of
- *                      parsing this bad record.
- * @param cause the actual exception about why the record is bad and can't be 
parsed.
- */
-case class BadRecordException(
-    record: () => UTF8String,
-    partialResult: () => Option[InternalRow],
-    cause: Throwable) extends Exception(cause)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e6d2b1b..6c23861 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,11 +27,10 @@ import org.apache.spark.Partition
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
-import org.apache.spark.sql.catalyst.util.FailureSafeParser
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.{DataSource, 
FailureSafeParser}
 import org.apache.spark.sql.execution.datasources.csv._
-import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.types.{StringType, StructType}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
new file mode 100644
index 0000000..159aef2
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+    rawParser: IN => Seq[InternalRow],
+    mode: ParseMode,
+    schema: StructType,
+    columnNameOfCorruptRecord: String) {
+
+  private val corruptFieldIndex = 
schema.getFieldIndex(columnNameOfCorruptRecord)
+  private val actualSchema = StructType(schema.filterNot(_.name == 
columnNameOfCorruptRecord))
+  private val resultRow = new GenericInternalRow(schema.length)
+  private val nullResult = new GenericInternalRow(schema.length)
+
+  // This function takes 2 parameters: an optional partial result, and the bad 
record. If the given
+  // schema doesn't contain a field for corrupted record, we just return the 
partial result or a
+  // row with all fields null. If the given schema contains a field for 
corrupted record, we will
+  // set the bad record to this field, and set other fields according to the 
partial result or null.
+  private val toResultRow: (Option[InternalRow], () => UTF8String) => 
InternalRow = {
+    if (corruptFieldIndex.isDefined) {
+      (row, badRecord) => {
+        var i = 0
+        while (i < actualSchema.length) {
+          val from = actualSchema(i)
+          resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, 
from.dataType)).orNull
+          i += 1
+        }
+        resultRow(corruptFieldIndex.get) = badRecord()
+        resultRow
+      }
+    } else {
+      (row, _) => row.getOrElse(nullResult)
+    }
+  }
+
+  def parse(input: IN): Iterator[InternalRow] = {
+    try {
+      rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () 
=> null))
+    } catch {
+      case e: BadRecordException => mode match {
+        case PermissiveMode =>
+          Iterator(toResultRow(e.partialResult(), e.record))
+        case DropMalformedMode =>
+          Iterator.empty
+        case FailFastMode =>
+          throw e.cause
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 263f77e..c3657ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -30,7 +30,8 @@ import com.univocity.parsers.csv.CsvParser
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils, 
FailureSafeParser}
+import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.FailureSafeParser
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b903cae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 51e952c..4f2963d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -33,8 +33,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{AnalysisException, Dataset, Encoders, 
SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
-import org.apache.spark.sql.catalyst.util.FailureSafeParser
-import org.apache.spark.sql.execution.datasources.{CodecStreams, DataSource, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to