spark git commit: [SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class
Repository: spark Updated Branches: refs/heads/master 7bac2fe77 - c337844ed [SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class `HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers). Author: Cheng Lian l...@databricks.com Closes #6998 from liancheng/spark-8604 and squashes the following commits: 9be51d1 [Cheng Lian] Adds more comments 6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c337844e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c337844e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c337844e Branch: refs/heads/master Commit: c337844ed7f9b2cb7b217dc935183ef5e1096ca1 Parents: 7bac2fe Author: Cheng Lian l...@databricks.com Authored: Thu Jun 25 00:06:23 2015 -0700 Committer: Cheng Lian l...@databricks.com Committed: Thu Jun 25 00:06:23 2015 -0700 -- .../apache/spark/sql/parquet/newParquet.scala | 6 ++ .../apache/spark/sql/hive/orc/OrcRelation.scala | 12 ++- .../spark/sql/sources/SimpleTextRelation.scala | 2 ++ .../sql/sources/hadoopFsRelationSuites.scala| 21 4 files changed, 40 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c337844e/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 1d353bd..bc39fae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -194,6 +194,12 @@ private[sql] class ParquetRelation2( committerClass, classOf[ParquetOutputCommitter]) +// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override +// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why +// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is +// bundled with `ParquetOutputFormat[Row]`. +job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + // TODO There's no need to use two kinds of WriteSupport // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and // complex types. http://git-wip-us.apache.org/repos/asf/spark/blob/c337844e/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 705f48f..0fd7b3a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat = MapRedInputFormat, JobConf, RecordWriter, Reporter} +import org.apache.hadoop.mapred.{InputFormat = MapRedInputFormat, JobConf, OutputFormat = MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -194,6 +194,16 @@ private[sql] class OrcRelation( } override def prepareJobForWrite(job: Job): OutputWriterFactory = { +job.getConfiguration match { + case conf: JobConf = +conf.setOutputFormat(classOf[OrcOutputFormat]) + case conf = +conf.setClass( + mapred.output.format.class, + classOf[OrcOutputFormat], + classOf[MapRedOutputFormat[_, _]]) +} + new OutputWriterFactory { override def newInstance( path: String, http://git-wip-us.apache.org/repos/asf/spark/blob/c337844e/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 5d7cd16..e814192
spark git commit: [SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class
Repository: spark Updated Branches: refs/heads/branch-1.4 792ed7a4b - 0605e0843 [SPARK-8604] [SQL] HadoopFsRelation subclasses should set their output format class `HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers). Author: Cheng Lian l...@databricks.com Closes #6998 from liancheng/spark-8604 and squashes the following commits: 9be51d1 [Cheng Lian] Adds more comments 6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class (cherry picked from commit c337844ed7f9b2cb7b217dc935183ef5e1096ca1) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0605e084 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0605e084 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0605e084 Branch: refs/heads/branch-1.4 Commit: 0605e08434e8c1d5f7d6ef766ea6eb94ba6ac92f Parents: 792ed7a Author: Cheng Lian l...@databricks.com Authored: Thu Jun 25 00:06:23 2015 -0700 Committer: Cheng Lian l...@databricks.com Committed: Thu Jun 25 00:07:01 2015 -0700 -- .../apache/spark/sql/parquet/newParquet.scala | 6 ++ .../apache/spark/sql/hive/orc/OrcRelation.scala | 12 ++- .../spark/sql/sources/SimpleTextRelation.scala | 2 ++ .../sql/sources/hadoopFsRelationSuites.scala| 21 4 files changed, 40 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index abf9614..36b0047 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -195,6 +195,12 @@ private[sql] class ParquetRelation2( committerClass, classOf[ParquetOutputCommitter]) +// We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override +// it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why +// we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is +// bundled with `ParquetOutputFormat[Row]`. +job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + // TODO There's no need to use two kinds of WriteSupport // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and // complex types. http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 101f2ff..3713f6f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat = MapRedInputFormat, JobConf, RecordWriter, Reporter} +import org.apache.hadoop.mapred.{InputFormat = MapRedInputFormat, JobConf, OutputFormat = MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -193,6 +193,16 @@ private[sql] class OrcRelation( } override def prepareJobForWrite(job: Job): OutputWriterFactory = { +job.getConfiguration match { + case conf: JobConf = +conf.setOutputFormat(classOf[OrcOutputFormat]) + case conf = +conf.setClass( + mapred.output.format.class, + classOf[OrcOutputFormat], + classOf[MapRedOutputFormat[_, _]]) +} + new OutputWriterFactory { override def newInstance( path: String, http://git-wip-us.apache.org/repos/asf/spark/blob/0605e084/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala -- diff --git