Repository: spark
Updated Branches:
  refs/heads/branch-1.4 90525c9ba -> a25ce91f9


[SPARK-7847] [SQL] Fixes dynamic partition directory escaping

Please refer to [SPARK-7847] [1] for details.

[1]: https://issues.apache.org/jira/browse/SPARK-7847

Author: Cheng Lian <l...@databricks.com>

Closes #6389 from liancheng/spark-7847 and squashes the following commits:

935c652 [Cheng Lian] Adds test case for writing various data types as dynamic 
partition value
f4fc398 [Cheng Lian] Converts partition columns to Scala type when writing 
dynamic partitions
d0aeca0 [Cheng Lian] Fixes dynamic partition directory escaping

(cherry picked from commit 15459db4f6867e95076cf53fade2fca833c4cf4e)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a25ce91f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a25ce91f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a25ce91f

Branch: refs/heads/branch-1.4
Commit: a25ce91f9685604cfb567a6860182ba467ceed8d
Parents: 90525c9
Author: Cheng Lian <l...@databricks.com>
Authored: Wed May 27 10:09:12 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 27 10:09:20 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 22 ++++--
 .../spark/sql/sources/PartitioningUtils.scala   | 76 +++++++++++++++++++-
 .../org/apache/spark/sql/sources/commands.scala | 57 ++-------------
 .../ParquetPartitionDiscoverySuite.scala        | 57 ++++++++++++++-
 4 files changed, 152 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a25ce91f/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index cb1e608..8b3e1b2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.parquet
 
+import java.net.URI
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
@@ -282,21 +283,28 @@ private[sql] class ParquetRelation2(
         val cacheMetadata = useMetadataCache
 
         @transient val cachedStatuses = inputFiles.map { f =>
-          // In order to encode the authority of a Path containing special 
characters such as /,
-          // we need to use the string returned by the URI of the path to 
create a new Path.
-          val pathWithAuthority = new Path(f.getPath.toUri.toString)
-
+          // In order to encode the authority of a Path containing special 
characters such as '/'
+          // (which does happen in some S3N credentials), we need to use the 
string returned by the
+          // URI of the path to create a new Path.
+          val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
           new FileStatus(
             f.getLen, f.isDir, f.getReplication, f.getBlockSize, 
f.getModificationTime,
-            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, 
pathWithAuthority)
+            f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, 
pathWithEscapedAuthority)
         }.toSeq
 
         @transient val cachedFooters = footers.map { f =>
           // In order to encode the authority of a Path containing special 
characters such as /,
           // we need to use the string returned by the URI of the path to 
create a new Path.
-          new Footer(new Path(f.getFile.toUri.toString), f.getParquetMetadata)
+          new Footer(escapePathUserInfo(f.getFile), f.getParquetMetadata)
         }.toSeq
 
+        private def escapePathUserInfo(path: Path): Path = {
+          val uri = path.toUri
+          new Path(new URI(
+            uri.getScheme, uri.getRawUserInfo, uri.getHost, uri.getPort, 
uri.getPath,
+            uri.getQuery, uri.getFragment))
+        }
+
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
           val inputFormat = if (cacheMetadata) {
@@ -377,7 +385,7 @@ private[sql] class ParquetRelation2(
               .orElse(readSchema())
               .orElse(maybeMetastoreSchema)
               .getOrElse(sys.error("Failed to get the schema."))
-        
+
           // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
           // case insensitivity issue and possible schema mismatch (probably 
caused by schema
           // evolution).

http://git-wip-us.apache.org/repos/asf/spark/blob/a25ce91f/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index e0ead23..dafdf0f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.Shell
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
@@ -221,7 +222,7 @@ private[sql] object PartitioningUtils {
       // Then falls back to string
       .getOrElse {
         if (raw == defaultPartitionName) Literal.create(null, NullType)
-        else Literal.create(raw, StringType)
+        else Literal.create(unescapePathName(raw), StringType)
       }
   }
 
@@ -243,4 +244,77 @@ private[sql] object PartitioningUtils {
       Literal.create(Cast(l, desiredType).eval(), desiredType)
     }
   }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+  // The following string escaping code is mainly copied from Hive 
(o.a.h.h.common.FileUtils).
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+
+  val charToEscape = {
+    val bitSet = new java.util.BitSet(128)
+
+    /**
+     * ASCII 01-1F are HTTP control characters that need to be escaped.
+     * \u000A and \u000D are \n and \r, respectively.
+     */
+    val clist = Array(
+      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', 
'\u0008', '\u0009',
+      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', 
'\u0012', '\u0013',
+      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', 
'\u001B', '\u001C',
+      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', 
'?', '\\', '\u007F',
+      '{', '[', ']', '^')
+
+    clist.foreach(bitSet.set(_))
+
+    if (Shell.WINDOWS) {
+      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
+    }
+
+    bitSet
+  }
+
+  def needsEscaping(c: Char): Boolean = {
+    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
+  }
+
+  def escapePathName(path: String): String = {
+    val builder = new StringBuilder()
+    path.foreach { c =>
+      if (needsEscaping(c)) {
+        builder.append('%')
+        builder.append(f"${c.asInstanceOf[Int]}%02x")
+      } else {
+        builder.append(c)
+      }
+    }
+
+    builder.toString()
+  }
+
+  def unescapePathName(path: String): String = {
+    val sb = new StringBuilder
+    var i = 0
+
+    while (i < path.length) {
+      val c = path.charAt(i)
+      if (c == '%' && i + 2 < path.length) {
+        val code: Int = try {
+          Integer.valueOf(path.substring(i + 1, i + 3), 16)
+        } catch { case e: Exception =>
+          -1: Integer
+        }
+        if (code >= 0) {
+          sb.append(code.asInstanceOf[Char])
+          i += 3
+        } else {
+          sb.append(c)
+          i += 1
+        }
+      } else {
+        sb.append(c)
+        i += 1
+      }
+    }
+
+    sb.toString()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a25ce91f/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index fbd98ef..3132067 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.hadoop.util.Shell
 import parquet.hadoop.util.ContextUtil
 
 import org.apache.spark._
@@ -35,7 +34,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
 
 private[sql] case class InsertIntoDataSource(
     logicalRelation: LogicalRelation,
@@ -208,9 +208,11 @@ private[sql] case class InsertIntoHadoopFsRelation(
           
writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart)
         }
       } else {
+        val partitionSchema = StructType.fromAttributes(partitionOutput)
+        val converter = 
CatalystTypeConverters.createToScalaConverter(partitionSchema)
         while (iterator.hasNext) {
           val row = iterator.next()
-          val partitionPart = partitionProj(row)
+          val partitionPart = converter(partitionProj(row)).asInstanceOf[Row]
           val dataPart = dataProj(row)
           writerContainer.outputWriterForRow(partitionPart).write(dataPart)
         }
@@ -416,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer(
       val valueString = if (string == null || string.isEmpty) {
         defaultPartitionName
       } else {
-        DynamicPartitionWriterContainer.escapePathName(string)
+        PartitioningUtils.escapePathName(string)
       }
       s"/$col=$valueString"
     }.mkString.stripPrefix(Path.SEPARATOR)
@@ -448,50 +450,3 @@ private[sql] class DynamicPartitionWriterContainer(
     }
   }
 }
-
-private[sql] object DynamicPartitionWriterContainer {
-  
//////////////////////////////////////////////////////////////////////////////////////////////////
-  // The following string escaping code is mainly copied from Hive 
(o.a.h.h.common.FileUtils).
-  
//////////////////////////////////////////////////////////////////////////////////////////////////
-
-  val charToEscape = {
-    val bitSet = new java.util.BitSet(128)
-
-    /**
-     * ASCII 01-1F are HTTP control characters that need to be escaped.
-     * \u000A and \u000D are \n and \r, respectively.
-     */
-    val clist = Array(
-      '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', 
'\u0008', '\u0009',
-      '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', 
'\u0012', '\u0013',
-      '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', 
'\u001B', '\u001C',
-      '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', 
'?', '\\', '\u007F',
-      '{', '[', ']', '^')
-
-    clist.foreach(bitSet.set(_))
-
-    if (Shell.WINDOWS) {
-      Array(' ', '<', '>', '|').foreach(bitSet.set(_))
-    }
-
-    bitSet
-  }
-
-  def needsEscaping(c: Char): Boolean = {
-    c >= 0 && c < charToEscape.size() && charToEscape.get(c)
-  }
-
-  def escapePathName(path: String): String = {
-    val builder = new StringBuilder()
-    path.foreach { c =>
-      if (DynamicPartitionWriterContainer.needsEscaping(c)) {
-        builder.append('%')
-        builder.append(f"${c.asInstanceOf[Int]}%02x")
-      } else {
-        builder.append(c)
-      }
-    }
-
-    builder.toString()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a25ce91f/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 90d4528..f231589 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -17,6 +17,8 @@
 package org.apache.spark.sql.parquet
 
 import java.io.File
+import java.math.BigInteger
+import java.sql.{Timestamp, Date}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -27,7 +29,7 @@ import org.apache.spark.sql.sources.PartitioningUtils._
 import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{QueryTest, Row, SQLContext}
+import org.apache.spark.sql.{Column, QueryTest, Row, SQLContext}
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -377,4 +379,57 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest {
       }
     }
   }
+
+  test("SPARK-7847: Dynamic partition directory path escaping and unescaping") 
{
+    withTempPath { dir =>
+      val df = Seq("/", "[]", "?").zipWithIndex.map(_.swap).toDF("i", "s")
+      df.write.format("parquet").partitionBy("s").save(dir.getCanonicalPath)
+      checkAnswer(read.parquet(dir.getCanonicalPath), df.collect())
+    }
+  }
+
+  test("Various partition value types") {
+    val row =
+      Row(
+        100.toByte,
+        40000.toShort,
+        Int.MaxValue,
+        Long.MaxValue,
+        1.5.toFloat,
+        4.5,
+        new java.math.BigDecimal(new BigInteger("212500"), 5),
+        new java.math.BigDecimal(2.125),
+        java.sql.Date.valueOf("2015-05-23"),
+        new Timestamp(0),
+        "This is a string, /[]?=:",
+        "This is not a partition column")
+
+    // BooleanType is not supported yet
+    val partitionColumnTypes =
+      Seq(
+        ByteType,
+        ShortType,
+        IntegerType,
+        LongType,
+        FloatType,
+        DoubleType,
+        DecimalType(10, 5),
+        DecimalType.Unlimited,
+        DateType,
+        TimestampType,
+        StringType)
+
+    val partitionColumns = partitionColumnTypes.zipWithIndex.map {
+      case (t, index) => StructField(s"p_$index", t)
+    }
+
+    val schema = StructType(partitionColumns :+ StructField(s"i", StringType))
+    val df = createDataFrame(sparkContext.parallelize(row :: Nil), schema)
+
+    withTempPath { dir =>
+      df.write.format("parquet").partitionBy(partitionColumns.map(_.name): 
_*).save(dir.toString)
+      val fields = schema.map(f => Column(f.name).cast(f.dataType))
+      checkAnswer(read.load(dir.toString).select(fields: _*), row)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to