Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19651#discussion_r148935336
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
    @@ -39,3 +58,134 @@ private[sql] object OrcFileFormat {
         names.foreach(checkFieldName)
       }
     }
    +
    +class DefaultSource extends OrcFileFormat
    +
    +/**
    + * New ORC File Format based on Apache ORC 1.4.1 and above.
    + */
    +class OrcFileFormat
    +  extends FileFormat
    +  with DataSourceRegister
    +  with Serializable {
    +
    +  override def shortName(): String = "orc"
    +
    +  override def toString: String = "ORC_1.4"
    +
    +  override def hashCode(): Int = getClass.hashCode()
    +
    +  override def equals(other: Any): Boolean = 
other.isInstanceOf[OrcFileFormat]
    +
    +  override def inferSchema(
    +      sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    OrcUtils.readSchema(sparkSession, files)
    +  }
    +
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val orcOptions = new OrcOptions(options, 
sparkSession.sessionState.conf)
    +
    +    val conf = job.getConfiguration
    +
    +    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, 
OrcUtils.getSchemaString(dataSchema))
    +
    +    conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
    +
    +    conf.asInstanceOf[JobConf]
    +      
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
    +
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        new OrcOutputWriter(path, dataSchema, context)
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = 
{
    +        val compressionExtension: String = {
    +          val name = context.getConfiguration.get(COMPRESS.getAttribute)
    +          OrcOptions.extensionsForCompressionCodecNames.getOrElse(name, "")
    +        }
    +
    +        compressionExtension + ".orc"
    +      }
    +    }
    +  }
    +
    +  override def isSplitable(
    +      sparkSession: SparkSession,
    +      options: Map[String, String],
    +      path: Path): Boolean = {
    +    true
    +  }
    +
    +  override def buildReaderWithPartitionValues(
    --- End diff --
    
    we should override `buildReader` and return `GenericInternalRow` here. Then 
the parent class will merge the partition values and output `UnsafeRow`. This 
is what the current `OrcFileFormat` does and let's keep it first.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to