Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/5526#discussion_r28425682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala --- @@ -197,3 +233,69 @@ trait InsertableRelation { trait CatalystScan { def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] } + +/** + * ::Experimental:: + * [[OutputWriter]] is used together with [[FSBasedRelation]] for persisting rows to the + * underlying file system. An [[OutputWriter]] instance is created when a new output file is + * opened. This instance is used to persist rows to this single output file. + */ +@Experimental +trait OutputWriter { + /** + * Persists a single row. Invoked on the executor side. + */ + def write(row: Row): Unit + + /** + * Closes the [[OutputWriter]]. Invoked on the executor side after all rows are persisted, before + * the task output is committed. + */ + def close(): Unit +} + +/** + * ::Experimental:: + * A [[BaseRelation]] that abstracts file system based data sources. + * + * For the read path, similar to [[PrunedFilteredScan]], it can eliminate unneeded columns and + * filter using selected predicates before producing an RDD containing all matching tuples as + * [[Row]] objects. + * + * In addition, when reading from Hive style partitioned tables stored in file systems, it's able to + * discover partitioning information from the paths of input directories, and perform partition + * pruning before start reading the data. + * + * For the write path, it provides the ability to write to both non-partitioned and partitioned + * tables. Directory layout of the partitioned tables is compatible with Hive. + */ +@Experimental +trait FSBasedRelation extends BaseRelation { + /** + * Builds an `RDD[Row]` containing all rows within this relation. + * + * @param requiredColumns Required columns. + * @param filters Candidate filters to be pushed down. The actual filter should be the conjunction + * of all `filters`. The pushed down filters are currently purely an optimization as they + * will all be evaluated again. This means it is safe to use them with methods that produce + * false positives such as filtering partitions based on a bloom filter. + * @param inputPaths Data files to be read. If the underlying relation is partitioned, only data + * files within required partition directories are included. + */ + def buildScan( + requiredColumns: Array[String], + filters: Array[Filter], + inputPaths: Array[String]): RDD[Row] + + /** + * When writing rows to this relation, this method is invoked on the driver side before the actual + * write job is issued. It provides an opportunity to configure the write job to be performed. + */ + def prepareForWrite(conf: Configuration): Unit --- End diff -- @rxin I decided to keep this method. For data sources like Hive, Parquet, and ORC, although the driver side preparation work done before issuing the write job can be somehow converted to executor side, we still need another similar hook to do the work. Constructor of `OutputWriter` instances is not a proper place to do the executor side preparation. Because in case of dynamic partitioning, a single task may create multiple `OutputWriter`, while the preparation should be done only once. Another reason is that, for traditional Hadoop users, driver side setup code can be pretty conventional and familiar.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org