Repository: spark
Updated Branches:
  refs/heads/branch-1.4 f67a27d02 -> 1f90a06bd


[SPARK-8074] Parquet should throw AnalysisException during setup for data 
type/name related failures.

Author: Reynold Xin <r...@databricks.com>

Closes #6608 from rxin/parquet-analysis and squashes the following commits:

b5dc8e2 [Reynold Xin] Code review feedback.
5617cf6 [Reynold Xin] [SPARK-8074] Parquet should throw AnalysisException 
during setup for data type/name related failures.

(cherry picked from commit 939e4f3d8def16dfe03f0196be8e1c218a9daa32)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f90a06b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f90a06b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f90a06b

Branch: refs/heads/branch-1.4
Commit: 1f90a06bda985ae8508e0439d11405d294fde2ec
Parents: f67a27d
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Jun 3 13:57:57 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jun 3 13:58:15 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/ParquetTypes.scala | 20 +++++++++-----------
 .../apache/spark/sql/parquet/newParquet.scala   | 14 ++++++++------
 2 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f90a06b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 6698b19..f8a5d84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.parquet
 
 import java.io.IOException
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
@@ -33,12 +33,11 @@ import parquet.schema.PrimitiveType.{PrimitiveTypeName => 
ParquetPrimitiveTypeNa
 import parquet.schema.Type.Repetition
 import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => 
ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, 
PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => 
ParquetTypes}
 
+import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.types._
-import org.apache.spark.{Logging, SparkException}
 
-// Implicits
-import scala.collection.JavaConversions._
 
 /** A class representing Parquet info fields we care about, for passing back 
to Parquet */
 private[parquet] case class ParquetTypeInfo(
@@ -73,13 +72,12 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
       case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
       case ParquetPrimitiveTypeName.INT96 =>
         // TODO: add BigInteger type? TODO(andre) use DecimalType instead????
-        sys.error("Potential loss of precision: cannot convert INT96")
+        throw new AnalysisException("Potential loss of precision: cannot 
convert INT96")
       case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
         if (originalType == ParquetOriginalType.DECIMAL && 
decimalInfo.getPrecision <= 18) =>
           // TODO: for now, our reader only supports decimals that fit in a 
Long
           DecimalType(decimalInfo.getPrecision, decimalInfo.getScale)
-      case _ => sys.error(
-        s"Unsupported parquet datatype $parquetType")
+      case _ => throw new AnalysisException(s"Unsupported parquet datatype 
$parquetType")
     }
   }
 
@@ -371,7 +369,7 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
             parquetKeyType,
             parquetValueType)
         }
-        case _ => sys.error(s"Unsupported datatype $ctype")
+        case _ => throw new AnalysisException(s"Unsupported datatype $ctype")
       }
     }
   }
@@ -403,7 +401,7 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
   def convertFromString(string: String): Seq[Attribute] = {
     
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) 
match {
       case s: StructType => s.toAttributes
-      case other => sys.error(s"Can convert $string to row")
+      case other => throw new AnalysisException(s"Can convert $string to row")
     }
   }
 
@@ -411,8 +409,8 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
     // ,;{}()\n\t= and space character are special characters in Parquet schema
     schema.map(_.name).foreach { name =>
       if (name.matches(".*[ ,;{}()\n\t=].*")) {
-        sys.error(
-          s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\n\t=".
+        throw new AnalysisException(
+          s"""Attribute name "$name" contains invalid character(s) among " 
,;{}()\\n\\t=".
              |Please use alias to rename it.
            """.stripMargin.split("\n").mkString(" "))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/1f90a06b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 824ae36..bf55e23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -39,6 +39,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
@@ -83,7 +84,7 @@ private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext
             case partFilePattern(id) => id.toInt
             case name if name.startsWith("_") => 0
             case name if name.startsWith(".") => 0
-            case name => sys.error(
+            case name => throw new AnalysisException(
               s"Trying to write Parquet files to directory $outputPath, " +
                 s"but found items with illegal name '$name'.")
           }.reduceOption(_ max _).getOrElse(0)
@@ -380,11 +381,12 @@ private[sql] class ParquetRelation2(
       // time-consuming.
       if (dataSchema == null) {
         dataSchema = {
-          val dataSchema0 =
-            maybeDataSchema
-              .orElse(readSchema())
-              .orElse(maybeMetastoreSchema)
-              .getOrElse(sys.error("Failed to get the schema."))
+          val dataSchema0 = maybeDataSchema
+            .orElse(readSchema())
+            .orElse(maybeMetastoreSchema)
+            .getOrElse(throw new AnalysisException(
+              s"Failed to discover schema of Parquet file(s) in the following 
location(s):\n" +
+                paths.mkString("\n\t")))
 
           // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
           // case insensitivity issue and possible schema mismatch (probably 
caused by schema


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to