Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/18975#discussion_r155185768 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.SparkException +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources._ + +/** + * A command used to write the result of a query to a directory. + * + * The syntax of using this command in SQL is: + * {{{ + * INSERT OVERWRITE DIRECTORY (path=STRING)? + * USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...]) + * SELECT ... + * }}} + * + * @param storage storage format used to describe how the query result is stored. + * @param provider the data source type to be used + * @param query the logical plan representing data to write to + * @param overwrite whthere overwrites existing directory + */ +case class InsertIntoDataSourceDirCommand( + storage: CatalogStorageFormat, + provider: String, + query: LogicalPlan, + overwrite: Boolean) extends RunnableCommand { + + override def children: Seq[LogicalPlan] = Seq(query) + + override def run(sparkSession: SparkSession, children: Seq[SparkPlan]): Seq[Row] = { + assert(children.length == 1) + assert(storage.locationUri.nonEmpty, "Directory path is required") + assert(provider.nonEmpty, "Data source is required") + + // Create the relation based on the input logical plan: `query`. + val pathOption = storage.locationUri.map("path" -> CatalogUtils.URIToString(_)) + + val dataSource = DataSource( + sparkSession, + className = provider, + options = storage.properties ++ pathOption, + catalogTable = None) + + val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass) + if (!isFileFormat) { + throw new SparkException( + "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass) + } + + val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists + try { + sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)) + dataSource.writeAndRead(saveMode, query) --- End diff -- The implementation here confused me, just want to leave a question here why we should call both `writeAndRead` and `planForWriting`? @janewangfb @gatorsmile @cloud-fan
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org