This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d50603a  [SPARK-27271][SQL] Migrate Text to File Data Source V2
d50603a is described below

commit d50603a37c478b299e5bd5c1e04cf2f65e108d1e
Author: Gengliang Wang <gengliang.w...@databricks.com>
AuthorDate: Mon Apr 8 10:15:22 2019 -0700

    [SPARK-27271][SQL] Migrate Text to File Data Source V2
    
    ## What changes were proposed in this pull request?
    
    Migrate Text source to File Data Source V2
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #24207 from gengliangwang/textV2.
    
    Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  2 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |  2 +-
 .../datasources/text/TextFileFormat.scala          | 29 ---------
 .../execution/datasources/text/TextOptions.scala   |  2 +-
 .../datasources/text/TextOutputWriter.scala        | 54 ++++++++++++++++
 .../datasources/v2/text/TextDataSourceV2.scala     | 44 +++++++++++++
 .../v2/text/TextPartitionReaderFactory.scala       | 73 ++++++++++++++++++++++
 .../execution/datasources/v2/text/TextScan.scala   | 61 ++++++++++++++++++
 .../datasources/v2/text/TextScanBuilder.scala      | 38 +++++++++++
 .../execution/datasources/v2/text/TextTable.scala  | 48 ++++++++++++++
 .../datasources/v2/text/TextWriteBuilder.scala     | 70 +++++++++++++++++++++
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala |  2 +-
 12 files changed, 392 insertions(+), 33 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 71a49c2..157be1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1488,7 +1488,7 @@ object SQLConf {
       " register class names for which data source V2 write paths are 
disabled. Writes from these" +
       " sources will fall back to the V1 sources.")
     .stringConf
-    .createWithDefault("csv,orc")
+    .createWithDefault("csv,orc,text")
 
   val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
     .doc("A comma-separated list of fully qualified data source register class 
names for which" +
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index b686187..be9cb81 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -4,7 +4,7 @@ org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 org.apache.spark.sql.execution.datasources.noop.NoopDataSource
 org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-org.apache.spark.sql.execution.datasources.text.TextFileFormat
+org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 60756e7..d8811c7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.text
 
-import java.io.OutputStream
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -143,30 +141,3 @@ class TextFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     dataType == StringType
 }
 
-class TextOutputWriter(
-    path: String,
-    dataSchema: StructType,
-    lineSeparator: Array[Byte],
-    context: TaskAttemptContext)
-  extends OutputWriter {
-
-  private var outputStream: Option[OutputStream] = None
-
-  override def write(row: InternalRow): Unit = {
-    val os = outputStream.getOrElse {
-      val newStream = CodecStreams.createOutputStream(context, new Path(path))
-      outputStream = Some(newStream)
-      newStream
-    }
-
-    if (!row.isNullAt(0)) {
-      val utf8string = row.getUTF8String(0)
-      utf8string.writeTo(os)
-    }
-    os.write(lineSeparator)
-  }
-
-  override def close(): Unit = {
-    outputStream.foreach(_.close())
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
index e4e2019..ef13216 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
 /**
  * Options for the Text data source.
  */
-private[text] class TextOptions(@transient private val parameters: 
CaseInsensitiveMap[String])
+class TextOptions(@transient private val parameters: 
CaseInsensitiveMap[String])
   extends Serializable {
 
   import TextOptions._
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala
new file mode 100644
index 0000000..faf6e57
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOutputWriter.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.text
+
+import java.io.OutputStream
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
+import org.apache.spark.sql.types.StructType
+
+class TextOutputWriter(
+    path: String,
+    dataSchema: StructType,
+    lineSeparator: Array[Byte],
+    context: TaskAttemptContext)
+  extends OutputWriter {
+
+  private var outputStream: Option[OutputStream] = None
+
+  override def write(row: InternalRow): Unit = {
+    val os = outputStream.getOrElse {
+      val newStream = CodecStreams.createOutputStream(context, new Path(path))
+      outputStream = Some(newStream)
+      newStream
+    }
+
+    if (!row.isNullAt(0)) {
+      val utf8string = row.getUTF8String(0)
+      utf8string.writeTo(os)
+    }
+    os.write(lineSeparator)
+  }
+
+  override def close(): Unit = {
+    outputStream.foreach(_.close())
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
new file mode 100644
index 0000000..f6aa1e9
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextDataSourceV2.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.sources.v2.Table
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TextDataSourceV2 extends FileDataSourceV2 {
+
+  override def fallbackFileFormat: Class[_ <: FileFormat] = 
classOf[TextFileFormat]
+
+  override def shortName(): String = "text"
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    val paths = getPaths(options)
+    val tableName = getTableName(paths)
+    TextTable(tableName, sparkSession, options, paths, None, 
fallbackFileFormat)
+  }
+
+  override def getTable(options: CaseInsensitiveStringMap, schema: 
StructType): Table = {
+    val paths = getPaths(options)
+    val tableName = getTableName(paths)
+    TextTable(tableName, sparkSession, options, paths, Some(schema), 
fallbackFileFormat)
+  }
+}
+
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
new file mode 100644
index 0000000..8788887
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextPartitionReaderFactory.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
+import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, 
HadoopFileWholeTextReader, PartitionedFile}
+import org.apache.spark.sql.execution.datasources.text.TextOptions
+import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A factory used to create Text readers.
+ *
+ * @param sqlConf SQL configuration.
+ * @param broadcastedConf Broadcasted serializable Hadoop Configuration.
+ * @param readDataSchema Required schema in the batch scan.
+ * @param partitionSchema Schema of partitions.
+ * @param textOptions Options for reading a text file.
+ * */
+case class TextPartitionReaderFactory(
+    sqlConf: SQLConf,
+    broadcastedConf: Broadcast[SerializableConfiguration],
+    readDataSchema: StructType,
+    partitionSchema: StructType,
+    textOptions: TextOptions) extends FilePartitionReaderFactory {
+
+  override def buildReader(file: PartitionedFile): 
PartitionReader[InternalRow] = {
+    val confValue = broadcastedConf.value.value
+    val reader = if (!textOptions.wholeText) {
+      new HadoopFileLinesReader(file, textOptions.lineSeparatorInRead, 
confValue)
+    } else {
+      new HadoopFileWholeTextReader(file, confValue)
+    }
+    Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
reader.close()))
+    val iter = if (readDataSchema.isEmpty) {
+      val emptyUnsafeRow = new UnsafeRow(0)
+      reader.map(_ => emptyUnsafeRow)
+    } else {
+      val unsafeRowWriter = new UnsafeRowWriter(1)
+
+      reader.map { line =>
+        // Writes to an UnsafeRow directly
+        unsafeRowWriter.reset()
+        unsafeRowWriter.write(0, line.getBytes, 0, line.getLength)
+        unsafeRowWriter.getRow()
+      }
+    }
+    val fileReader = new PartitionReaderFromIterator[InternalRow](iter)
+    new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
+      partitionSchema, file.partitionValues)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
new file mode 100644
index 0000000..202723d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScan.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.text.TextOptions
+import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.SerializableConfiguration
+
+case class TextScan(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    readDataSchema: StructType,
+    readPartitionSchema: StructType,
+    options: CaseInsensitiveStringMap)
+  extends TextBasedFileScan(sparkSession, fileIndex, readDataSchema, 
readPartitionSchema, options) {
+
+  private val optionsAsScala = options.asScala.toMap
+  private lazy val textOptions: TextOptions = new TextOptions(optionsAsScala)
+
+  override def isSplitable(path: Path): Boolean = {
+    super.isSplitable(path) && !textOptions.wholeText
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    assert(
+      readDataSchema.length <= 1,
+      "Text data source only produces a single data column named \"value\".")
+    val hadoopConf = {
+      val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+      // Hadoop Configurations are case sensitive.
+      sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    }
+    val broadcastedConf = sparkSession.sparkContext.broadcast(
+      new SerializableConfiguration(hadoopConf))
+    TextPartitionReaderFactory(sparkSession.sessionState.conf, 
broadcastedConf, readDataSchema,
+      readPartitionSchema, textOptions)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala
new file mode 100644
index 0000000..fbe5e16
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextScanBuilder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class TextScanBuilder(
+    sparkSession: SparkSession,
+    fileIndex: PartitioningAwareFileIndex,
+    schema: StructType,
+    dataSchema: StructType,
+    options: CaseInsensitiveStringMap)
+  extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+
+  override def build(): Scan = {
+    TextScan(sparkSession, fileIndex, readDataSchema(), readPartitionSchema(), 
options)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
new file mode 100644
index 0000000..b8cb61a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.types.{DataType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class TextTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat])
+  extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+  override def newScanBuilder(options: CaseInsensitiveStringMap): 
TextScanBuilder =
+    TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+
+  override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
+    Some(StructType(Seq(StructField("value", StringType))))
+
+  override def newWriteBuilder(options: CaseInsensitiveStringMap): 
WriteBuilder =
+    new TextWriteBuilder(options, paths, formatName, supportsDataType)
+
+  override def supportsDataType(dataType: DataType): Boolean = dataType == 
StringType
+
+  override def formatName: String = "Text"
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala
new file mode 100644
index 0000000..c00dbc2
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextWriteBuilder.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.text
+
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, 
OutputWriterFactory}
+import org.apache.spark.sql.execution.datasources.text.{TextOptions, 
TextOutputWriter}
+import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class TextWriteBuilder(
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    formatName: String,
+    supportsDataType: DataType => Boolean)
+  extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
+  private def verifySchema(schema: StructType): Unit = {
+    if (schema.size != 1) {
+      throw new AnalysisException(
+        s"Text data source supports only a single column, and you have 
${schema.size} columns.")
+    }
+  }
+
+  override def prepareWrite(
+      sqlConf: SQLConf,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    verifySchema(dataSchema)
+
+    val textOptions = new TextOptions(options)
+    val conf = job.getConfiguration
+
+    textOptions.compressionCodec.foreach { codec =>
+      CompressionCodecs.setCodecConfiguration(conf, codec)
+    }
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        new TextOutputWriter(path, dataSchema, 
textOptions.lineSeparatorInWrite, context)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        ".txt" + CodecStreams.getCompressionExtension(context)
+      }
+    }
+  }
+}
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 3402ed2..c9ff4fc 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -930,7 +930,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       }
       assert(e.getMessage.contains(
         "The format of the existing table default.appendTextToJson is 
`JsonFileFormat`. " +
-        "It doesn't match the specified format `TextFileFormat`"))
+        "It doesn't match the specified format"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to