Repository: spark Updated Branches: refs/heads/master 473552fa5 -> beeafcfd6
Revert "[SPARK-5213] [SQL] Pluggable SQL Parser Support" This reverts commit 3ba5aaab8266822545ac82b9e733fd25cc215a77. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/beeafcfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/beeafcfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/beeafcfd Branch: refs/heads/master Commit: beeafcfd6ee1e460c4d564cd1515d8781989b422 Parents: 473552f Author: Patrick Wendell <patr...@databricks.com> Authored: Thu Apr 30 20:33:36 2015 -0700 Committer: Patrick Wendell <patr...@databricks.com> Committed: Thu Apr 30 20:33:36 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/AbstractSparkSQLParser.scala | 11 +-- .../org/apache/spark/sql/catalyst/Dialect.scala | 33 -------- .../spark/sql/catalyst/errors/package.scala | 2 - .../scala/org/apache/spark/sql/SQLContext.scala | 82 ++++---------------- .../org/apache/spark/sql/sources/ddl.scala | 6 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 22 ------ .../org/apache/spark/sql/hive/HiveContext.scala | 41 ++++------ .../apache/spark/sql/hive/test/TestHive.scala | 5 +- .../sql/hive/execution/SQLQuerySuite.scala | 39 +--------- 9 files changed, 42 insertions(+), 199 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 2eb3e16..1f3c024 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -25,6 +25,10 @@ import scala.util.parsing.input.CharArrayReader.EofCh import org.apache.spark.sql.catalyst.plans.logical._ +private[sql] object KeywordNormalizer { + def apply(str: String): String = str.toLowerCase() +} + private[sql] abstract class AbstractSparkSQLParser extends StandardTokenParsers with PackratParsers { @@ -38,7 +42,7 @@ private[sql] abstract class AbstractSparkSQLParser } protected case class Keyword(str: String) { - def normalize: String = lexical.normalizeKeyword(str) + def normalize: String = KeywordNormalizer(str) def parser: Parser[String] = normalize } @@ -86,16 +90,13 @@ class SqlLexical extends StdLexical { reserved ++= keywords } - /* Normal the keyword string */ - def normalizeKeyword(str: String): String = str.toLowerCase - delimiters += ( "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")", ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>" ) protected override def processIdent(name: String) = { - val token = normalizeKeyword(name) + val token = KeywordNormalizer(name) if (reserved contains token) Keyword(token) else Identifier(name) } http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala deleted file mode 100644 index 9770034..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst - -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -/** - * Root class of SQL Parser Dialect, and we don't guarantee the binary - * compatibility for the future release, let's keep it as the internal - * interface for advanced user. - * - */ -@DeveloperApi -abstract class Dialect { - // this is the main function that will be implemented by sql parser. - def parse(sqlText: String): LogicalPlan -} http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala index 0fd4f9b..bdeb660 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala @@ -38,8 +38,6 @@ package object errors { } } - class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause) - /** * Wraps any exceptions that are thrown while executing `f` in a * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`. http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 77f51df..bd4a55f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConversions._ import scala.collection.immutable import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag -import scala.util.control.NonFatal import com.google.common.reflect.TypeToken @@ -33,11 +32,9 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor -import org.apache.spark.sql.catalyst.Dialect import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions} import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} @@ -48,45 +45,6 @@ import org.apache.spark.util.Utils import org.apache.spark.{Partition, SparkContext} /** - * Currently we support the default dialect named "sql", associated with the class - * [[DefaultDialect]] - * - * And we can also provide custom SQL Dialect, for example in Spark SQL CLI: - * {{{ - *-- switch to "hiveql" dialect - * spark-sql>SET spark.sql.dialect=hiveql; - * spark-sql>SELECT * FROM src LIMIT 1; - * - *-- switch to "sql" dialect - * spark-sql>SET spark.sql.dialect=sql; - * spark-sql>SELECT * FROM src LIMIT 1; - * - *-- register the new SQL dialect - * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect; - * spark-sql> SELECT * FROM src LIMIT 1; - * - *-- register the non-exist SQL dialect - * spark-sql> SET spark.sql.dialect=NotExistedClass; - * spark-sql> SELECT * FROM src LIMIT 1; - * - *-- Exception will be thrown and switch to dialect - *-- "sql" (for SQLContext) or - *-- "hiveql" (for HiveContext) - * }}} - */ -private[spark] class DefaultDialect extends Dialect { - @transient - protected val sqlParser = { - val catalystSqlParser = new catalyst.SqlParser - new SparkSQLParser(catalystSqlParser.parse) - } - - override def parse(sqlText: String): LogicalPlan = { - sqlParser.parse(sqlText) - } -} - -/** * The entry point for working with structured data (rows and columns) in Spark. Allows the * creation of [[DataFrame]] objects as well as the execution of SQL queries. * @@ -174,27 +132,17 @@ class SQLContext(@transient val sparkContext: SparkContext) protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer @transient - protected[sql] val ddlParser = new DDLParser((sql: String) => { getSQLDialect().parse(sql) }) - - protected[sql] def getSQLDialect(): Dialect = { - try { - val clazz = Utils.classForName(dialectClassName) - clazz.newInstance().asInstanceOf[Dialect] - } catch { - case NonFatal(e) => - // Since we didn't find the available SQL Dialect, it will fail even for SET command: - // SET spark.sql.dialect=sql; Let's reset as default dialect automatically. - val dialect = conf.dialect - // reset the sql dialect - conf.unsetConf(SQLConf.DIALECT) - // throw out the exception, and the default sql dialect will take effect for next query. - throw new DialectException( - s"""Instantiating dialect '$dialect' failed. - |Reverting to default dialect '${conf.dialect}'""".stripMargin, e) - } + protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) + + @transient + protected[sql] val sqlParser = { + val fallback = new catalyst.SqlParser + new SparkSQLParser(fallback.parse(_)) } - protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false) + protected[sql] def parseSql(sql: String): LogicalPlan = { + ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql)) + } protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) @@ -208,12 +156,6 @@ class SQLContext(@transient val sparkContext: SparkContext) @transient protected[sql] val defaultSession = createSession() - protected[sql] def dialectClassName = if (conf.dialect == "sql") { - classOf[DefaultDialect].getCanonicalName - } else { - conf.dialect - } - sparkContext.getConf.getAll.foreach { case (key, value) if key.startsWith("spark.sql") => setConf(key, value) case _ => @@ -1003,7 +945,11 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group basic */ def sql(sqlText: String): DataFrame = { - DataFrame(this, parseSql(sqlText)) + if (conf.dialect == "sql") { + DataFrame(this, parseSql(sqlText)) + } else { + sys.error(s"Unsupported SQL dialect: ${conf.dialect}") + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 1abf3aa..e7a0685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -38,12 +38,12 @@ private[sql] class DDLParser( parseQuery: String => LogicalPlan) extends AbstractSparkSQLParser with DataTypeParser with Logging { - def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { + def parse(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = { try { - parse(input) + Some(parse(input)) } catch { case ddlException: DDLException => throw ddlException - case _ if !exceptionOnError => parseQuery(input) + case _ if !exceptionOnError => None case x: Throwable => throw x } } http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0ab8558..d8e7cdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,18 +19,13 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.GeneratedAggregate import org.apache.spark.sql.functions._ import org.apache.spark.sql.TestData._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext.{udf => _, _} - import org.apache.spark.sql.types._ -/** A SQL Dialect for testing purpose, and it can not be nested type */ -class MyDialect extends DefaultDialect - class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { // Make sure the tables are loaded. TestData @@ -79,23 +74,6 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) } - test("SQL Dialect Switching to a new SQL parser") { - val newContext = new SQLContext(TestSQLContext.sparkContext) - newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) - assert(newContext.getSQLDialect().getClass === classOf[MyDialect]) - assert(newContext.sql("SELECT 1").collect() === Array(Row(1))) - } - - test("SQL Dialect Switch to an invalid parser with alias") { - val newContext = new SQLContext(TestSQLContext.sparkContext) - newContext.sql("SET spark.sql.dialect=MyTestClass") - intercept[DialectException] { - newContext.sql("SELECT 1") - } - // test if the dialect set back to DefaultSQLDialect - assert(newContext.getSQLDialect().getClass === classOf[DefaultDialect]) - } - test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") { checkAnswer( sql("SELECT a FROM testData2 SORT BY a"), http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 1d8d0b5..dd06b26 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql.hive import java.io.{BufferedReader, InputStreamReader, PrintStream} import java.sql.Timestamp -import org.apache.hadoop.hive.ql.parse.VariableSubstitution -import org.apache.spark.sql.catalyst.Dialect - import scala.collection.JavaConversions._ import scala.language.implicitConversions @@ -46,15 +43,6 @@ import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy} import org.apache.spark.sql.types._ /** - * This is the HiveQL Dialect, this dialect is strongly bind with HiveContext - */ -private[hive] class HiveQLDialect extends Dialect { - override def parse(sqlText: String): LogicalPlan = { - HiveQl.parseSql(sqlText) - } -} - -/** * An instance of the Spark SQL execution engine that integrates with data stored in Hive. * Configuration for Hive is read from hive-site.xml on the classpath. */ @@ -93,16 +81,25 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { protected[sql] def convertCTAS: Boolean = getConf("spark.sql.hive.convertCTAS", "false").toBoolean - @transient - protected[sql] lazy val substitutor = new VariableSubstitution() - - protected[sql] override def parseSql(sql: String): LogicalPlan = { - super.parseSql(substitutor.substitute(hiveconf, sql)) - } - override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) + @transient + protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_)) + + override def sql(sqlText: String): DataFrame = { + val substituted = new VariableSubstitution().substitute(hiveconf, sqlText) + // TODO: Create a framework for registering parsers instead of just hardcoding if statements. + if (conf.dialect == "sql") { + super.sql(substituted) + } else if (conf.dialect == "hiveql") { + val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false) + DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted))) + } else { + sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'") + } + } + /** * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a @@ -359,12 +356,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { } } - override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") { - classOf[HiveQLDialect].getCanonicalName - } else { - super.dialectClassName - } - @transient private val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index edeab51..9f17bca 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -107,10 +107,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { /** Fewer partitions to speed up testing. */ protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt - - // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared. - // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql" - override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql") + override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") } } http://git-wip-us.apache.org/repos/asf/spark/blob/beeafcfd/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 630dec8..4f8d0ac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -18,17 +18,14 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries -import org.apache.spark.sql.catalyst.errors.DialectException -import org.apache.spark.sql.DefaultDialect -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} -import org.apache.spark.sql.hive.MetastoreRelation +import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ -import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim} import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) @@ -48,9 +45,6 @@ case class Order( state: String, month: Int) -/** A SQL Dialect for testing purpose, and it can not be nested type */ -class MyDialect extends DefaultDialect - /** * A collection of hive query tests where we generate the answers ourselves instead of depending on * Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is @@ -235,35 +229,6 @@ class SQLQuerySuite extends QueryTest { setConf("spark.sql.hive.convertCTAS", originalConf) } - test("SQL Dialect Switching") { - assert(getSQLDialect().getClass === classOf[HiveQLDialect]) - setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName()) - assert(getSQLDialect().getClass === classOf[MyDialect]) - assert(sql("SELECT 1").collect() === Array(Row(1))) - - // set the dialect back to the DefaultSQLDialect - sql("SET spark.sql.dialect=sql") - assert(getSQLDialect().getClass === classOf[DefaultDialect]) - sql("SET spark.sql.dialect=hiveql") - assert(getSQLDialect().getClass === classOf[HiveQLDialect]) - - // set invalid dialect - sql("SET spark.sql.dialect.abc=MyTestClass") - sql("SET spark.sql.dialect=abc") - intercept[Exception] { - sql("SELECT 1") - } - // test if the dialect set back to HiveQLDialect - getSQLDialect().getClass === classOf[HiveQLDialect] - - sql("SET spark.sql.dialect=MyTestClass") - intercept[DialectException] { - sql("SELECT 1") - } - // test if the dialect set back to HiveQLDialect - assert(getSQLDialect().getClass === classOf[HiveQLDialect]) - } - test("CTAS with serde") { sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect() sql( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org