Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12361#discussion_r60339109
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
 ---
    @@ -0,0 +1,530 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources
    +
    +import scala.collection.mutable
    +import scala.util.Try
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
    +import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
    +import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, 
InternalRow}
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.execution.FileRelation
    +import org.apache.spark.sql.sources.{BaseRelation, Filter}
    +import org.apache.spark.sql.types.{StringType, StructType}
    +import org.apache.spark.util.SerializableConfiguration
    +
    +/**
    + * ::Experimental::
    + * A factory that produces [[OutputWriter]]s.  A new 
[[OutputWriterFactory]] is created on driver
    + * side for each write job issued when writing to a [[HadoopFsRelation]], 
and then gets serialized
    + * to executor side to create actual [[OutputWriter]]s on the fly.
    + *
    + * @since 1.4.0
    + */
    +@Experimental
    +abstract class OutputWriterFactory extends Serializable {
    +  /**
    +   * When writing to a [[HadoopFsRelation]], this method gets called by 
each task on executor side
    +   * to instantiate new [[OutputWriter]]s.
    +   *
    +   * @param path Path of the file to which this [[OutputWriter]] is 
supposed to write.  Note that
    +   *        this may not point to the final output file.  For example, 
`FileOutputFormat` writes to
    +   *        temporary directories and then merge written files back to the 
final destination.  In
    +   *        this case, `path` points to a temporary output file under the 
temporary directory.
    +   * @param dataSchema Schema of the rows to be written. Partition columns 
are not included in the
    +   *        schema if the relation being written is partitioned.
    +   * @param context The Hadoop MapReduce task context.
    +   * @since 1.4.0
    +   */
    +  private[sql] def newInstance(
    +      path: String,
    +      bucketId: Option[Int], // TODO: This doesn't belong here...
    +      dataSchema: StructType,
    +      context: TaskAttemptContext): OutputWriter
    +}
    +
    +/**
    + * ::Experimental::
    + * [[OutputWriter]] is used together with [[HadoopFsRelation]] for 
persisting rows to the
    + * underlying file system.  Subclasses of [[OutputWriter]] must provide a 
zero-argument constructor.
    + * An [[OutputWriter]] instance is created and initialized when a new 
output file is opened on
    + * executor side.  This instance is used to persist rows to this single 
output file.
    + *
    + * @since 1.4.0
    + */
    +@Experimental
    +abstract class OutputWriter {
    +  /**
    +   * Persists a single row.  Invoked on the executor side.  When writing 
to dynamically partitioned
    +   * tables, dynamic partition columns are not included in rows to be 
written.
    +   *
    +   * @since 1.4.0
    +   */
    +  def write(row: Row): Unit
    +
    +  /**
    +   * Closes the [[OutputWriter]]. Invoked on the executor side after all 
rows are persisted, before
    +   * the task output is committed.
    +   *
    +   * @since 1.4.0
    +   */
    +  def close(): Unit
    +
    +  private var converter: InternalRow => Row = _
    +
    +  protected[sql] def initConverter(dataSchema: StructType) = {
    +    converter =
    +      
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
 => Row]
    +  }
    +
    +  protected[sql] def writeInternal(row: InternalRow): Unit = {
    +    write(converter(row))
    +  }
    +}
    +
    +/**
    + * Acts as a container for all of the metadata required to read from a 
datasource. All discovery,
    + * resolution and merging logic for schemas and partitions has been 
removed.
    + *
    + * @param location A [[FileCatalog]] that can enumerate the locations of 
all the files that comprise
    + *                 this relation.
    + * @param partitionSchema The schema of the columns (if any) that are used 
to partition the relation
    + * @param dataSchema The schema of any remaining columns.  Note that if 
any partition columns are
    + *                   present in the actual data files as well, they are 
preserved.
    + * @param bucketSpec Describes the bucketing (hash-partitioning of the 
files by some column values).
    + * @param fileFormat A file format that can be used to read and write the 
data in files.
    + * @param options Configuration used when reading / writing data.
    + */
    +case class HadoopFsRelation(
    +    sqlContext: SQLContext,
    +    location: FileCatalog,
    +    partitionSchema: StructType,
    +    dataSchema: StructType,
    +    bucketSpec: Option[BucketSpec],
    +    fileFormat: FileFormat,
    +    options: Map[String, String]) extends BaseRelation with FileRelation {
    +
    +  val schema: StructType = {
    +    val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
    +    StructType(dataSchema ++ partitionSchema.filterNot { column =>
    +      dataSchemaColumnNames.contains(column.name.toLowerCase)
    +    })
    +  }
    +
    +  def partitionSchemaOption: Option[StructType] =
    +    if (partitionSchema.isEmpty) None else Some(partitionSchema)
    +  def partitionSpec: PartitionSpec = location.partitionSpec()
    +
    +  def refresh(): Unit = location.refresh()
    +
    +  override def toString: String =
    +    s"HadoopFiles"
    +
    +  /** Returns the list of files that will be read when scanning this 
relation. */
    +  override def inputFiles: Array[String] =
    +    location.allFiles().map(_.getPath.toUri.toString).toArray
    +
    +  override def sizeInBytes: Long = location.allFiles().map(_.getLen).sum
    +}
    +
    +/**
    + * Used to read and write data stored in files to/from the [[InternalRow]] 
format.
    + */
    +trait FileFormat {
    +  /**
    +   * When possible, this method should return the schema of the given 
`files`.  When the format
    +   * does not support inference, or no valid files are given should return 
None.  In these cases
    +   * Spark will require that user specify the schema manually.
    +   */
    +  def inferSchema(
    +      sqlContext: SQLContext,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Option[StructType]
    +
    +  /**
    +   * Prepares a read job and returns a potentially updated data source 
option [[Map]]. This method
    +   * can be useful for collecting necessary global information for 
scanning input data.
    +   */
    +  def prepareRead(
    +      sqlContext: SQLContext,
    +      options: Map[String, String],
    +      files: Seq[FileStatus]): Map[String, String] = options
    +
    +  /**
    +   * Prepares a write job and returns an [[OutputWriterFactory]].  Client 
side job preparation can
    +   * be put here.  For example, user defined output committer can be 
configured here
    +   * by setting the output committer class in the conf of 
spark.sql.sources.outputCommitterClass.
    +   */
    +  def prepareWrite(
    +      sqlContext: SQLContext,
    +      job: Job,
    +      options: Map[String, String],
    +      dataSchema: StructType): OutputWriterFactory
    +
    +  /**
    +   * Returns whether this format support returning columnar batch or not.
    +   *
    +   * TODO: we should just have different traits for the different formats.
    +   */
    +  def supportBatch(sqlContext: SQLContext, dataSchema: StructType): 
Boolean = {
    +    false
    +  }
    +
    +  /**
    +   * Returns a function that can be used to read a single file in as an 
Iterator of InternalRow.
    +   *
    +   * @param dataSchema The global data schema. It can be either specified 
by the user, or
    +   *                   reconciled/merged from all underlying data files. 
If any partition columns
    +   *                   are contained in the files, they are preserved in 
this schema.
    +   * @param partitionSchema The schema of the partition column row that 
will be present in each
    +   *                        PartitionedFile. These columns should be 
appended to the rows that
    +   *                        are produced by the iterator.
    +   * @param requiredSchema The schema of the data that should be output 
for each row.  This may be a
    +   *                       subset of the columns that are present in the 
file if column pruning has
    +   *                       occurred.
    +   * @param filters A set of filters than can optionally be used to reduce 
the number of rows output
    +   * @param options A set of string -> string configuration options.
    +   * @return
    +   */
    +  def buildReader(
    +      sqlContext: SQLContext,
    --- End diff --
    
    Do we need @param for this ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to