Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19651#discussion_r148935336 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -39,3 +58,134 @@ private[sql] object OrcFileFormat { names.foreach(checkFieldName) } } + +class DefaultSource extends OrcFileFormat + +/** + * New ORC File Format based on Apache ORC 1.4.1 and above. + */ +class OrcFileFormat + extends FileFormat + with DataSourceRegister + with Serializable { + + override def shortName(): String = "orc" + + override def toString: String = "ORC_1.4" + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other.isInstanceOf[OrcFileFormat] + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + OrcUtils.readSchema(sparkSession, files) + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf) + + val conf = job.getConfiguration + + conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcUtils.getSchemaString(dataSchema)) + + conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec) + + conf.asInstanceOf[JobConf] + .setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]]) + + new OutputWriterFactory { + override def newInstance( + path: String, + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new OrcOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + val compressionExtension: String = { + val name = context.getConfiguration.get(COMPRESS.getAttribute) + OrcOptions.extensionsForCompressionCodecNames.getOrElse(name, "") + } + + compressionExtension + ".orc" + } + } + } + + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { + true + } + + override def buildReaderWithPartitionValues( --- End diff -- we should override `buildReader` and return `GenericInternalRow` here. Then the parent class will merge the partition values and output `UnsafeRow`. This is what the current `OrcFileFormat` does and let's keep it first.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org