Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19651#discussion_r152522983
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
    @@ -39,3 +58,122 @@ private[sql] object OrcFileFormat {
         names.foreach(checkFieldName)
       }
     }
    +
    +class DefaultSource extends OrcFileFormat
    +
    +/**
    + * New ORC File Format based on Apache ORC 1.4.1 and above.
    + */
    +class OrcFileFormat
    +  extends FileFormat
    +  with DataSourceRegister
    +  with Serializable {
    +
    +  override def shortName(): String = "orc"
    +
    +  override def toString: String = "ORC_1.4"
    +
    +  override def hashCode(): Int = getClass.hashCode()
    +
    +  override def equals(other: Any): Boolean = 
other.isInstanceOf[OrcFileFormat]
    +
    +  override def inferSchema(
    +      sparkSession: SparkSession,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType] = {
    +    OrcUtils.readSchema(sparkSession, files)
    +  }
    +
    +  override def prepareWrite(
    +      sparkSession: SparkSession,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory = {
    +    val orcOptions = new OrcOptions(options, 
sparkSession.sessionState.conf)
    +
    +    val conf = job.getConfiguration
    +
    +    conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, dataSchema.catalogString)
    +
    +    conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
    +
    +    conf.asInstanceOf[JobConf]
    +      
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])
    +
    +    new OutputWriterFactory {
    +      override def newInstance(
    +          path: String,
    +          dataSchema: StructType,
    +          context: TaskAttemptContext): OutputWriter = {
    +        new OrcOutputWriter(path, dataSchema, context)
    +      }
    +
    +      override def getFileExtension(context: TaskAttemptContext): String = 
{
    +        val compressionExtension: String = {
    +          val name = context.getConfiguration.get(COMPRESS.getAttribute)
    +          OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "")
    +        }
    +
    +        compressionExtension + ".orc"
    +      }
    +    }
    +  }
    +
    +  override def isSplitable(
    +      sparkSession: SparkSession,
    +      options: Map[String, String],
    +      path: Path): Boolean = {
    +    true
    +  }
    +
    +  override def buildReader(
    +      sparkSession: SparkSession,
    +      dataSchema: StructType,
    +      partitionSchema: StructType,
    +      requiredSchema: StructType,
    +      filters: Seq[Filter],
    +      options: Map[String, String],
    +      hadoopConf: Configuration): (PartitionedFile) => 
Iterator[InternalRow] = {
    +    if (sparkSession.sessionState.conf.orcFilterPushDown) {
    +      OrcFilters.createFilter(dataSchema, filters).foreach { f =>
    +        OrcInputFormat.setSearchArgument(hadoopConf, f, 
dataSchema.fieldNames)
    +      }
    +    }
    +
    +    val broadcastedConf =
    +      sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
    +    val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
    +
    +    (file: PartitionedFile) => {
    +      val conf = broadcastedConf.value.value
    +
    +      // SPARK-8501: Some old empty ORC files always have an empty schema 
stored in their footer.
    +      // In this case, `getMissingColumnNames` returns `None` and we 
return an empty iterator.
    +      val maybeMissingColumnNames = OrcUtils.getMissingColumnNames(
    +        isCaseSensitive, dataSchema, partitionSchema, new Path(new 
URI(file.filePath)), conf)
    +      if (maybeMissingColumnNames.isEmpty) {
    +        Iterator.empty
    --- End diff --
    
    This is confusing. Why does no missing columns mean empty file? The 
previous code uses `isEmptyFile`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to