[SPARK-15543][SQL] Rename DefaultSources to make them more self-describing ## What changes were proposed in this pull request? This patch renames various DefaultSources to make their names more self-describing. The choice of "DefaultSource" was from the days when we did not have a good way to specify short names.
They are now named: - LibSVMFileFormat - CSVFileFormat - JdbcRelationProvider - JsonFileFormat - ParquetFileFormat - TextFileFormat Backward compatibility is maintained through aliasing. ## How was this patch tested? Updated relevant test cases too. Author: Reynold Xin <r...@databricks.com> Closes #13311 from rxin/SPARK-15543. (cherry picked from commit 361ebc282b2d09dc6dcf21419a53c5c617b1b6bd) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3ee53b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3ee53b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3ee53b8 Branch: refs/heads/branch-2.0 Commit: b3ee53b8424bf7be4f3111744e07a6100a01e4da Parents: bcad1d1 Author: Reynold Xin <r...@databricks.com> Authored: Wed May 25 23:54:24 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Wed May 25 23:54:30 2016 -0700 ---------------------------------------------------------------------- ....apache.spark.sql.sources.DataSourceRegister | 2 +- .../spark/ml/source/libsvm/LibSVMRelation.scala | 8 +- project/MimaExcludes.scala | 4 +- ....apache.spark.sql.sources.DataSourceRegister | 10 +- .../spark/sql/execution/ExistingRDD.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 45 +- .../datasources/csv/CSVFileFormat.scala | 185 ++++ .../datasources/csv/DefaultSource.scala | 187 ---- .../datasources/jdbc/DefaultSource.scala | 55 -- .../datasources/jdbc/JdbcRelationProvider.scala | 55 ++ .../datasources/json/JSONRelation.scala | 198 ---- .../datasources/json/JsonFileFormat.scala | 198 ++++ .../datasources/parquet/ParquetFileFormat.scala | 923 +++++++++++++++++++ .../datasources/parquet/ParquetRelation.scala | 923 ------------------- .../datasources/text/DefaultSource.scala | 141 --- .../datasources/text/TextFileFormat.scala | 141 +++ .../execution/datasources/json/JsonSuite.scala | 4 +- .../parquet/ParquetSchemaSuite.scala | 12 +- .../sql/sources/ResolvedDataSourceSuite.scala | 18 +- .../streaming/DataFrameReaderWriterSuite.scala | 542 ----------- .../sql/streaming/FileStreamSinkSuite.scala | 4 +- .../test/DataFrameReaderWriterSuite.scala | 541 +++++++++++ ....apache.spark.sql.sources.DataSourceRegister | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 18 +- .../spark/sql/hive/orc/OrcFileFormat.scala | 375 ++++++++ .../apache/spark/sql/hive/orc/OrcRelation.scala | 371 -------- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 2 +- 27 files changed, 2498 insertions(+), 2468 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---------------------------------------------------------------------- diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index f632dd6..a865cbe 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1 @@ -org.apache.spark.ml.source.libsvm.DefaultSource +org.apache.spark.ml.source.libsvm.LibSVMFileFormat http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 5ba768d..64ebf0c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -90,7 +90,7 @@ private[libsvm] class LibSVMOutputWriter( * .load("data/mllib/sample_libsvm_data.txt") * * // Java - * DataFrame df = spark.read().format("libsvm") + * Dataset<Row> df = spark.read().format("libsvm") * .option("numFeatures, "780") * .load("data/mllib/sample_libsvm_data.txt"); * }}} @@ -105,9 +105,13 @@ private[libsvm] class LibSVMOutputWriter( * - "vectorType": feature vector type, "sparse" (default) or "dense". * * @see [[https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/ LIBSVM datasets]] + * + * Note that this class is public for documentation purpose. Please don't use this class directly. + * Rather, use the data source API as illustrated above. */ +// If this is moved or renamed, please update DataSource's backwardCompatibilityMap. @Since("1.6.0") -class DefaultSource extends FileFormat with DataSourceRegister { +class LibSVMFileFormat extends FileFormat with DataSourceRegister { @Since("1.6.0") override def shortName(): String = "libsvm" http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4e99a09..08c575a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -68,7 +68,9 @@ object MimaExcludes { // SPARK-13664 Replace HadoopFsRelation with FileFormat ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.LibSVMRelation"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelationProvider"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache"), + // SPARK-15543 Rename DefaultSources to make them more self-describing + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.source.libsvm.DefaultSource") ) ++ Seq( ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"), http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister ---------------------------------------------------------------------- diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index ef92557..9f8bb5d 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1,6 +1,6 @@ -org.apache.spark.sql.execution.datasources.csv.DefaultSource -org.apache.spark.sql.execution.datasources.jdbc.DefaultSource -org.apache.spark.sql.execution.datasources.json.DefaultSource -org.apache.spark.sql.execution.datasources.parquet.DefaultSource -org.apache.spark.sql.execution.datasources.text.DefaultSource +org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider +org.apache.spark.sql.execution.datasources.json.JsonFileFormat +org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +org.apache.spark.sql.execution.datasources.text.TextFileFormat org.apache.spark.sql.execution.streaming.ConsoleSinkProvider http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ec23a9c..412f5fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d0853f6..dfe0647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -30,6 +30,10 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat +import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -74,15 +78,34 @@ case class DataSource( lazy val sourceInfo = sourceSchema() /** A map to maintain backward compatibility in case we move data sources around. */ - private val backwardCompatibilityMap = Map( - "org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName, - "org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName, - "org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName, - "org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName, - "org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName, - "org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName, - "com.databricks.spark.csv" -> classOf[csv.DefaultSource].getCanonicalName - ) + private val backwardCompatibilityMap: Map[String, String] = { + val jdbc = classOf[JdbcRelationProvider].getCanonicalName + val json = classOf[JsonFileFormat].getCanonicalName + val parquet = classOf[ParquetFileFormat].getCanonicalName + val csv = classOf[CSVFileFormat].getCanonicalName + val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" + val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat" + + Map( + "org.apache.spark.sql.jdbc" -> jdbc, + "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc, + "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc, + "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc, + "org.apache.spark.sql.json" -> json, + "org.apache.spark.sql.json.DefaultSource" -> json, + "org.apache.spark.sql.execution.datasources.json" -> json, + "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json, + "org.apache.spark.sql.parquet" -> parquet, + "org.apache.spark.sql.parquet.DefaultSource" -> parquet, + "org.apache.spark.sql.execution.datasources.parquet" -> parquet, + "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet, + "org.apache.spark.sql.hive.orc.DefaultSource" -> orc, + "org.apache.spark.sql.hive.orc" -> orc, + "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm, + "org.apache.spark.ml.source.libsvm" -> libsvm, + "com.databricks.spark.csv" -> csv + ) + } /** * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0. @@ -188,7 +211,7 @@ case class DataSource( throw new IllegalArgumentException("'path' is not specified") }) val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE) - val isTextSource = providingClass == classOf[text.DefaultSource] + val isTextSource = providingClass == classOf[text.TextFileFormat] // If the schema inference is disabled, only text sources require schema to be specified if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) { throw new IllegalArgumentException( @@ -229,7 +252,7 @@ case class DataSource( providingClass.newInstance() match { case s: StreamSinkProvider => s.createSink(sparkSession.sqlContext, options, partitionColumns) - case parquet: parquet.DefaultSource => + case parquet: parquet.ParquetFileFormat => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala new file mode 100644 index 0000000..4d36b76 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.nio.charset.{Charset, StandardCharsets} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce._ + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.SerializableConfiguration + +/** + * Provides access to CSV data from pure SQL statements. + */ +class CSVFileFormat extends FileFormat with DataSourceRegister { + + override def shortName(): String = "csv" + + override def toString: String = "CSV" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + val csvOptions = new CSVOptions(options) + + // TODO: Move filtering. + val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) + val rdd = baseRdd(sparkSession, csvOptions, paths) + val firstLine = findFirstLine(csvOptions, rdd) + val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) + + val header = if (csvOptions.headerFlag) { + firstRow.zipWithIndex.map { case (value, index) => + if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value + } + } else { + firstRow.zipWithIndex.map { case (value, index) => s"_c$index" } + } + + val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) + val schema = if (csvOptions.inferSchemaFlag) { + CSVInferSchema.infer(parsedRdd, header, csvOptions) + } else { + // By default fields are assumed to be StringType + val schemaFields = header.map { fieldName => + StructField(fieldName.toString, StringType, nullable = true) + } + StructType(schemaFields) + } + Some(schema) + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + verifySchema(dataSchema) + val conf = job.getConfiguration + val csvOptions = new CSVOptions(options) + csvOptions.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) + } + + new CSVOutputWriterFactory(csvOptions) + } + + override def buildReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + val csvOptions = new CSVOptions(options) + val headers = requiredSchema.fields.map(_.name) + + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + (file: PartitionedFile) => { + val lineIterator = { + val conf = broadcastedHadoopConf.value.value + new HadoopFileLinesReader(file, conf).map { line => + new String(line.getBytes, 0, line.getLength, csvOptions.charset) + } + } + + CSVRelation.dropHeaderLine(file, lineIterator, csvOptions) + + val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers) + val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions) + tokenizedIterator.flatMap(parser(_).toSeq) + } + } + + private def baseRdd( + sparkSession: SparkSession, + options: CSVOptions, + inputPaths: Seq[String]): RDD[String] = { + readText(sparkSession, options, inputPaths.mkString(",")) + } + + private def tokenRdd( + sparkSession: SparkSession, + options: CSVOptions, + header: Array[String], + inputPaths: Seq[String]): RDD[Array[String]] = { + val rdd = baseRdd(sparkSession, options, inputPaths) + // Make sure firstLine is materialized before sending to executors + val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null + CSVRelation.univocityTokenizer(rdd, header, firstLine, options) + } + + /** + * Returns the first line of the first non-empty file in path + */ + private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = { + if (options.isCommentSet) { + val comment = options.comment.toString + rdd.filter { line => + line.trim.nonEmpty && !line.startsWith(comment) + }.first() + } else { + rdd.filter { line => + line.trim.nonEmpty + }.first() + } + } + + private def readText( + sparkSession: SparkSession, + options: CSVOptions, + location: String): RDD[String] = { + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { + sparkSession.sparkContext.textFile(location) + } else { + val charset = options.charset + sparkSession.sparkContext + .hadoopFile[LongWritable, Text, TextInputFormat](location) + .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) + } + } + + private def verifySchema(schema: StructType): Unit = { + schema.foreach { field => + field.dataType match { + case _: ArrayType | _: MapType | _: StructType => + throw new UnsupportedOperationException( + s"CSV data source does not support ${field.dataType.simpleString} data type.") + case _ => + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala deleted file mode 100644 index 057bde1..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import java.nio.charset.{Charset, StandardCharsets} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus -import org.apache.hadoop.io.{LongWritable, Text} -import org.apache.hadoop.mapred.TextInputFormat -import org.apache.hadoop.mapreduce._ - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration - -/** - * Provides access to CSV data from pure SQL statements. - */ -class DefaultSource extends FileFormat with DataSourceRegister { - - override def shortName(): String = "csv" - - override def toString: String = "CSV" - - override def hashCode(): Int = getClass.hashCode() - - override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] - - override def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - val csvOptions = new CSVOptions(options) - - // TODO: Move filtering. - val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString) - val rdd = baseRdd(sparkSession, csvOptions, paths) - val firstLine = findFirstLine(csvOptions, rdd) - val firstRow = new LineCsvReader(csvOptions).parseLine(firstLine) - - val header = if (csvOptions.headerFlag) { - firstRow.zipWithIndex.map { case (value, index) => - if (value == null || value.isEmpty || value == csvOptions.nullValue) s"_c$index" else value - } - } else { - firstRow.zipWithIndex.map { case (value, index) => s"_c$index" } - } - - val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths) - val schema = if (csvOptions.inferSchemaFlag) { - CSVInferSchema.infer(parsedRdd, header, csvOptions) - } else { - // By default fields are assumed to be StringType - val schemaFields = header.map { fieldName => - StructField(fieldName.toString, StringType, nullable = true) - } - StructType(schemaFields) - } - Some(schema) - } - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - verifySchema(dataSchema) - val conf = job.getConfiguration - val csvOptions = new CSVOptions(options) - csvOptions.compressionCodec.foreach { codec => - CompressionCodecs.setCodecConfiguration(conf, codec) - } - - new CSVOutputWriterFactory(csvOptions) - } - - override def buildReader( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val csvOptions = new CSVOptions(options) - val headers = requiredSchema.fields.map(_.name) - - val broadcastedHadoopConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - - (file: PartitionedFile) => { - val lineIterator = { - val conf = broadcastedHadoopConf.value.value - new HadoopFileLinesReader(file, conf).map { line => - new String(line.getBytes, 0, line.getLength, csvOptions.charset) - } - } - - CSVRelation.dropHeaderLine(file, lineIterator, csvOptions) - - val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers) - val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions) - tokenizedIterator.flatMap(parser(_).toSeq) - } - } - - private def baseRdd( - sparkSession: SparkSession, - options: CSVOptions, - inputPaths: Seq[String]): RDD[String] = { - readText(sparkSession, options, inputPaths.mkString(",")) - } - - private def tokenRdd( - sparkSession: SparkSession, - options: CSVOptions, - header: Array[String], - inputPaths: Seq[String]): RDD[Array[String]] = { - val rdd = baseRdd(sparkSession, options, inputPaths) - // Make sure firstLine is materialized before sending to executors - val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null - CSVRelation.univocityTokenizer(rdd, header, firstLine, options) - } - - /** - * Returns the first line of the first non-empty file in path - */ - private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = { - if (options.isCommentSet) { - val comment = options.comment.toString - rdd.filter { line => - line.trim.nonEmpty && !line.startsWith(comment) - }.first() - } else { - rdd.filter { line => - line.trim.nonEmpty - }.first() - } - } - - private def readText( - sparkSession: SparkSession, - options: CSVOptions, - location: String): RDD[String] = { - if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.sparkContext.textFile(location) - } else { - val charset = options.charset - sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](location) - .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) - } - } - - private def verifySchema(schema: StructType): Unit = { - schema.foreach { field => - field.dataType match { - case _: ArrayType | _: MapType | _: StructType => - throw new UnsupportedOperationException( - s"CSV data source does not support ${field.dataType.simpleString} data type.") - case _ => - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala deleted file mode 100644 index 6609e5d..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.jdbc - -import java.util.Properties - -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} - -class DefaultSource extends RelationProvider with DataSourceRegister { - - override def shortName(): String = "jdbc" - - /** Returns a new base relation with the given parameters. */ - override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String]): BaseRelation = { - val jdbcOptions = new JDBCOptions(parameters) - if (jdbcOptions.partitionColumn != null - && (jdbcOptions.lowerBound == null - || jdbcOptions.upperBound == null - || jdbcOptions.numPartitions == null)) { - sys.error("Partitioning incompletely specified") - } - - val partitionInfo = if (jdbcOptions.partitionColumn == null) { - null - } else { - JDBCPartitioningInfo( - jdbcOptions.partitionColumn, - jdbcOptions.lowerBound.toLong, - jdbcOptions.upperBound.toLong, - jdbcOptions.numPartitions.toInt) - } - val parts = JDBCRelation.columnPartition(partitionInfo) - val properties = new Properties() // Additional properties that we will pass to getConnection - parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) - JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala new file mode 100644 index 0000000..106ed1d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcRelationProvider.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc + +import java.util.Properties + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider} + +class JdbcRelationProvider extends RelationProvider with DataSourceRegister { + + override def shortName(): String = "jdbc" + + /** Returns a new base relation with the given parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val jdbcOptions = new JDBCOptions(parameters) + if (jdbcOptions.partitionColumn != null + && (jdbcOptions.lowerBound == null + || jdbcOptions.upperBound == null + || jdbcOptions.numPartitions == null)) { + sys.error("Partitioning incompletely specified") + } + + val partitionInfo = if (jdbcOptions.partitionColumn == null) { + null + } else { + JDBCPartitioningInfo( + jdbcOptions.partitionColumn, + jdbcOptions.lowerBound.toLong, + jdbcOptions.upperBound.toLong, + jdbcOptions.numPartitions.toInt) + } + val parts = JDBCRelation.columnPartition(partitionInfo) + val properties = new Properties() // Additional properties that we will pass to getConnection + parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) + JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala deleted file mode 100644 index 4c97abe..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.json - -import java.io.CharArrayWriter - -import com.fasterxml.jackson.core.JsonFactory -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.io.{LongWritable, NullWritable, Text} -import org.apache.hadoop.mapred.{JobConf, TextInputFormat} -import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat - -import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration - -class DefaultSource extends FileFormat with DataSourceRegister { - - override def shortName(): String = "json" - - override def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - if (files.isEmpty) { - None - } else { - val parsedOptions: JSONOptions = new JSONOptions(options) - val columnNameOfCorruptRecord = - parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) - val jsonFiles = files.filterNot { status => - val name = status.getPath.getName - name.startsWith("_") || name.startsWith(".") - }.toArray - - val jsonSchema = InferSchema.infer( - createBaseRdd(sparkSession, jsonFiles), - columnNameOfCorruptRecord, - parsedOptions) - checkConstraints(jsonSchema) - - Some(jsonSchema) - } - } - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - val conf = job.getConfiguration - val parsedOptions: JSONOptions = new JSONOptions(options) - parsedOptions.compressionCodec.foreach { codec => - CompressionCodecs.setCodecConfiguration(conf, codec) - } - - new OutputWriterFactory { - override def newInstance( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new JsonOutputWriter(path, bucketId, dataSchema, context) - } - } - } - - override def buildReader( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { - val broadcastedHadoopConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - - val parsedOptions: JSONOptions = new JSONOptions(options) - val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord - .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) - - (file: PartitionedFile) => { - val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString) - - JacksonParser.parseJson( - lines, - requiredSchema, - columnNameOfCorruptRecord, - parsedOptions) - } - } - - private def createBaseRdd( - sparkSession: SparkSession, - inputPaths: Seq[FileStatus]): RDD[String] = { - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) - val conf = job.getConfiguration - - val paths = inputPaths.map(_.getPath) - - if (paths.nonEmpty) { - FileInputFormat.setInputPaths(job, paths: _*) - } - - sparkSession.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], - classOf[TextInputFormat], - classOf[LongWritable], - classOf[Text]).map(_._2.toString) // get the text line - } - - /** Constraints to be imposed on schema to be stored. */ - private def checkConstraints(schema: StructType): Unit = { - if (schema.fieldNames.length != schema.fieldNames.distinct.length) { - val duplicateColumns = schema.fieldNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - }.mkString(", ") - throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + - s"cannot save to JSON format") - } - } - - override def toString: String = "JSON" - - override def hashCode(): Int = getClass.hashCode() - - override def equals(other: Any): Boolean = other.isInstanceOf[DefaultSource] -} - -private[json] class JsonOutputWriter( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext) - extends OutputWriter with Logging { - - private[this] val writer = new CharArrayWriter() - // create the Generator without separator inserted between 2 records - private[this] val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) - private[this] val result = new Text() - - private val recordWriter: RecordWriter[NullWritable, Text] = { - new TextOutputFormat[NullWritable, Text]() { - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - val configuration = context.getConfiguration - val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") - val taskAttemptId = context.getTaskAttemptID - val split = taskAttemptId.getTaskID.getId - val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension") - } - }.getRecordWriter(context) - } - - override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - - override protected[sql] def writeInternal(row: InternalRow): Unit = { - JacksonGenerator(dataSchema, gen)(row) - gen.flush() - - result.set(writer.toString) - writer.reset() - - recordWriter.write(NullWritable.get(), result) - } - - override def close(): Unit = { - gen.close() - recordWriter.close(context) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/b3ee53b8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala new file mode 100644 index 0000000..35f2476 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.CharArrayWriter + +import com.fasterxml.jackson.core.JsonFactory +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, NullWritable, Text} +import org.apache.hadoop.mapred.{JobConf, TextInputFormat} +import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class JsonFileFormat extends FileFormat with DataSourceRegister { + + override def shortName(): String = "json" + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + if (files.isEmpty) { + None + } else { + val parsedOptions: JSONOptions = new JSONOptions(options) + val columnNameOfCorruptRecord = + parsedOptions.columnNameOfCorruptRecord + .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + val jsonFiles = files.filterNot { status => + val name = status.getPath.getName + name.startsWith("_") || name.startsWith(".") + }.toArray + + val jsonSchema = InferSchema.infer( + createBaseRdd(sparkSession, jsonFiles), + columnNameOfCorruptRecord, + parsedOptions) + checkConstraints(jsonSchema) + + Some(jsonSchema) + } + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val conf = job.getConfiguration + val parsedOptions: JSONOptions = new JSONOptions(options) + parsedOptions.compressionCodec.foreach { codec => + CompressionCodecs.setCodecConfiguration(conf, codec) + } + + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new JsonOutputWriter(path, bucketId, dataSchema, context) + } + } + } + + override def buildReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + val parsedOptions: JSONOptions = new JSONOptions(options) + val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord + .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord) + + (file: PartitionedFile) => { + val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString) + + JacksonParser.parseJson( + lines, + requiredSchema, + columnNameOfCorruptRecord, + parsedOptions) + } + } + + private def createBaseRdd( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus]): RDD[String] = { + val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val conf = job.getConfiguration + + val paths = inputPaths.map(_.getPath) + + if (paths.nonEmpty) { + FileInputFormat.setInputPaths(job, paths: _*) + } + + sparkSession.sparkContext.hadoopRDD( + conf.asInstanceOf[JobConf], + classOf[TextInputFormat], + classOf[LongWritable], + classOf[Text]).map(_._2.toString) // get the text line + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { + if (schema.fieldNames.length != schema.fieldNames.distinct.length) { + val duplicateColumns = schema.fieldNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => "\"" + x + "\"" + }.mkString(", ") + throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + + s"cannot save to JSON format") + } + } + + override def toString: String = "JSON" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] +} + +private[json] class JsonOutputWriter( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext) + extends OutputWriter with Logging { + + private[this] val writer = new CharArrayWriter() + // create the Generator without separator inserted between 2 records + private[this] val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null) + private[this] val result = new Text() + + private val recordWriter: RecordWriter[NullWritable, Text] = { + new TextOutputFormat[NullWritable, Text]() { + override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { + val configuration = context.getConfiguration + val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID") + val taskAttemptId = context.getTaskAttemptID + val split = taskAttemptId.getTaskID.getId + val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") + new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension") + } + }.getRecordWriter(context) + } + + override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") + + override protected[sql] def writeInternal(row: InternalRow): Unit = { + JacksonGenerator(dataSchema, gen)(row) + gen.flush() + + result.set(writer.toString) + writer.reset() + + recordWriter.write(NullWritable.get(), result) + } + + override def close(): Unit = { + gen.close() + recordWriter.close(context) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org