Repository: spark Updated Branches: refs/heads/master 1d1de28a3 -> c4bd57602
[SPARK-12721][SQL] SQL Generation for Script Transformation #### What changes were proposed in this pull request? This PR is to convert to SQL from analyzed logical plans containing operator `ScriptTransformation`. For example, below is the SQL containing `Transform` ``` SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2 ``` Its logical plan is like ``` ScriptTransformation [a#210L,b#211L,c#212L,d#213L], cat, [key#208,value#209], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim, )),List((field.delim, )),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true) +- SubqueryAlias parquet_t2 +- Relation[a#210L,b#211L,c#212L,d#213L] ParquetRelation ``` The generated SQL will be like ``` SELECT TRANSFORM (`parquet_t2`.`a`, `parquet_t2`.`b`, `parquet_t2`.`c`, `parquet_t2`.`d`) USING 'cat' AS (`key` string, `value` string) FROM `default`.`parquet_t2` ``` #### How was this patch tested? Seven test cases are added to `LogicalPlanToSQLSuite`. Author: gatorsmile <gatorsm...@gmail.com> Author: xiaoli <lixiao1...@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #11503 from gatorsmile/transformToSQL. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4bd5760 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4bd5760 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4bd5760 Branch: refs/heads/master Commit: c4bd57602c0b14188d364bb475631bf473d25082 Parents: 1d1de28 Author: gatorsmile <gatorsm...@gmail.com> Authored: Wed Mar 16 13:11:11 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Wed Mar 16 13:11:11 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/hive/SQLBuilder.scala | 29 ++++++++++ .../hive/execution/ScriptTransformation.scala | 48 +++++++++++++++++ .../spark/sql/hive/LogicalPlanToSQLSuite.scala | 57 ++++++++++++++++++++ 3 files changed, 134 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c4bd5760/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala index 3bc8e9a..f3446a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.hive.execution.HiveScriptIOSchema import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType} /** @@ -184,6 +185,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi p.partitionExpressions.map(_.sql).mkString(", ") ) + case p: ScriptTransformation => + scriptTransformationToSQL(p) + case OneRowRelation => "" @@ -209,6 +213,31 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi ) } + private def scriptTransformationToSQL(plan: ScriptTransformation): String = { + val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema] + val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse( + throw new UnsupportedOperationException( + s"unsupported row format ${ioSchema.inputRowFormat}")) + val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse( + throw new UnsupportedOperationException( + s"unsupported row format ${ioSchema.outputRowFormat}")) + + val outputSchema = plan.output.map { attr => + s"${attr.sql} ${attr.dataType.simpleString}" + }.mkString(", ") + + build( + "SELECT TRANSFORM", + "(" + plan.input.map(_.sql).mkString(", ") + ")", + inputRowFormatSQL, + s"USING \'${plan.script}\'", + "AS (" + outputSchema + ")", + outputRowFormatSQL, + if (plan.child == OneRowRelation) "" else "FROM", + toSQL(plan.child) + ) + } + private def aggregateToSQL(plan: Aggregate): String = { val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ") build( http://git-wip-us.apache.org/repos/asf/spark/blob/c4bd5760/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 3b53716..62e7c12 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -400,4 +400,52 @@ case class HiveScriptIOSchema ( instance } } + + def inputRowFormatSQL: Option[String] = + getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps) + + def outputRowFormatSQL: Option[String] = + getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps) + + /** + * Get the row format specification + * Note: + * 1. Changes are needed when readerClause and writerClause are supported. + * 2. Changes are needed when "ESCAPED BY" is supported. + */ + private def getRowFormatSQL( + rowFormat: Seq[(String, String)], + serdeClass: Option[String], + serdeProps: Seq[(String, String)]): Option[String] = { + if (schemaLess) return Some("") + + val rowFormatDelimited = + rowFormat.map { + case ("TOK_TABLEROWFORMATFIELD", value) => + "FIELDS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATCOLLITEMS", value) => + "COLLECTION ITEMS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATMAPKEYS", value) => + "MAP KEYS TERMINATED BY " + value + case ("TOK_TABLEROWFORMATLINES", value) => + "LINES TERMINATED BY " + value + case ("TOK_TABLEROWFORMATNULL", value) => + "NULL DEFINED AS " + value + case o => return None + } + + val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("") + val serdePropsSQL = + if (serdeClass.nonEmpty) { + val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ") + if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else "" + } else { + "" + } + if (rowFormat.nonEmpty) { + Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" ")) + } else { + Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL) + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c4bd5760/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala index f02ecb4..ca46c22 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala @@ -383,6 +383,63 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils { } } + test("script transformation - schemaless") { + checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2") + checkHiveQl("SELECT TRANSFORM (*) USING 'cat' FROM parquet_t2") + } + + test("script transformation - alias list") { + checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' AS (d1, d2, d3, d4) FROM parquet_t2") + } + + test("script transformation - alias list with type") { + checkHiveQl( + """FROM + |(FROM parquet_t1 SELECT TRANSFORM(key, value) USING 'cat' AS (thing1 int, thing2 string)) t + |SELECT thing1 + 1 + """.stripMargin) + } + + test("script transformation - row format delimited clause with only one format property") { + checkHiveQl( + """SELECT TRANSFORM (key) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' + |USING 'cat' AS (tKey) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' + |FROM parquet_t1 + """.stripMargin) + } + + test("script transformation - row format delimited clause with multiple format properties") { + checkHiveQl( + """SELECT TRANSFORM (key) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t' + |USING 'cat' AS (tKey) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t' + |FROM parquet_t1 + """.stripMargin) + } + + test("script transformation - row format serde clauses with SERDEPROPERTIES") { + checkHiveQl( + """SELECT TRANSFORM (key, value) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |WITH SERDEPROPERTIES('field.delim' = '|') + |USING 'cat' AS (tKey, tValue) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |WITH SERDEPROPERTIES('field.delim' = '|') + |FROM parquet_t1 + """.stripMargin) + } + + test("script transformation - row format serde clauses without SERDEPROPERTIES") { + checkHiveQl( + """SELECT TRANSFORM (key, value) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |USING 'cat' AS (tKey, tValue) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |FROM parquet_t1 + """.stripMargin) + } + test("plans with non-SQL expressions") { sqlContext.udf.register("foo", (_: Int) * 2) intercept[UnsupportedOperationException](new SQLBuilder(sql("SELECT foo(id) FROM t0")).toSQL) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org