Repository: spark Updated Branches: refs/heads/master cfcfc92f7 -> f7c07db85
[SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch. This PR also adds hive provider checking in DataStream reader/writer, which is missed in #16552 ## How was this patch tested? N/A Author: Wenchen Fan <wenc...@databricks.com> Closes #16693 from cloud-fan/minor. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7c07db8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7c07db8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7c07db8 Branch: refs/heads/master Commit: f7c07db852f22d694ca49792e4ceae04d45b71ef Parents: cfcfc92 Author: Wenchen Fan <wenc...@databricks.com> Authored: Sat Jan 28 20:38:03 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Sat Jan 28 20:38:03 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/streaming/DataStreamReader.scala | 6 ++ .../spark/sql/streaming/DataStreamWriter.scala | 6 ++ .../CreateHiveTableAsSelectCommand.scala | 66 ++++++++------------ .../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++- 4 files changed, 49 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 7db9d92..6d2cede 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types.StructType @@ -116,6 +117,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * @since 2.0.0 */ def load(): DataFrame = { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "read files of Hive data source directly.") + } + val dataSource = DataSource( sparkSession, http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 7e7a1ba..0f7a337 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink} @@ -221,6 +222,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * @since 2.0.0 */ def start(): StreamingQuery = { + if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) { + throw new AnalysisException("Hive data source can only be used with tables, you can not " + + "write files of Hive data source directly.") + } + if (source == "memory") { assertNotPartitioned("memory") if (extraOptions.get("queryName").isEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 2c754d7..41c6b18 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution import scala.util.control.NonFatal import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.hive.MetastoreRelation /** @@ -44,40 +44,6 @@ case class CreateHiveTableAsSelectCommand( override def innerChildren: Seq[LogicalPlan] = Seq(query) override def run(sparkSession: SparkSession): Seq[Row] = { - lazy val metastoreRelation: MetastoreRelation = { - import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat - import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe - import org.apache.hadoop.io.Text - import org.apache.hadoop.mapred.TextInputFormat - - val withFormat = - tableDesc.withNewStorage( - inputFormat = - tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)), - outputFormat = - tableDesc.storage.outputFormat - .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)), - serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)), - compressed = tableDesc.storage.compressed) - - val withSchema = if (withFormat.schema.isEmpty) { - tableDesc.copy(schema = query.schema) - } else { - withFormat - } - - sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true) - - // Get the Metastore Relation - sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { - case SubqueryAlias(_, r: SimpleCatalogRelation, _) => - val tableMeta = r.metadata - MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession) - } - } - // TODO ideally, we should get the output data ready first and then - // add the relation into catalog, just in case of failure occurs while data - // processing. if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { assert(mode != SaveMode.Overwrite, s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite") @@ -89,12 +55,30 @@ case class CreateHiveTableAsSelectCommand( // Since the table already exists and the save mode is Ignore, we will just return. return Seq.empty } - sparkSession.sessionState.executePlan(InsertIntoTable( - metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd + + sparkSession.sessionState.executePlan( + InsertIntoTable( + UnresolvedRelation(tableIdentifier), + Map(), + query, + overwrite = false, + ifNotExists = false)).toRdd } else { + // TODO ideally, we should get the output data ready first and then + // add the relation into catalog, just in case of failure occurs while data + // processing. + assert(tableDesc.schema.isEmpty) + sparkSession.sessionState.catalog.createTable( + tableDesc.copy(schema = query.schema), ignoreIfExists = false) + try { - sparkSession.sessionState.executePlan(InsertIntoTable( - metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd + sparkSession.sessionState.executePlan( + InsertIntoTable( + UnresolvedRelation(tableIdentifier), + Map(), + query, + overwrite = true, + ifNotExists = false)).toRdd } catch { case NonFatal(e) => // drop the created table. http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2827183..58be079 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { - import spark.implicits._ + import testImplicits._ override def afterEach(): Unit = { try { @@ -1425,6 +1425,17 @@ class HiveDDLSuite Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath) } assert(e2.message.contains("Hive data source can only be used with tables")) + + val e3 = intercept[AnalysisException] { + spark.readStream.format("hive").load(dir.getAbsolutePath) + } + assert(e3.message.contains("Hive data source can only be used with tables")) + + val e4 = intercept[AnalysisException] { + spark.readStream.schema(new StructType()).parquet(dir.getAbsolutePath) + .writeStream.format("hive").start(dir.getAbsolutePath) + } + assert(e4.message.contains("Hive data source can only be used with tables")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org