Repository: spark Updated Branches: refs/heads/master 50fdd866b -> a0da854fb
[SPARK-16221][SQL] Redirect Parquet JUL logger via SLF4J for WRITE operations ## What changes were proposed in this pull request? [SPARK-8118](https://github.com/apache/spark/pull/8196) implements redirecting Parquet JUL logger via SLF4J, but it is currently applied only when READ operations occurs. If users use only WRITE operations, there occurs many Parquet logs. This PR makes the redirection work on WRITE operations, too. **Before** ```scala scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Jun 26, 2016 9:04:38 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY ............ about 70 lines Parquet Log ............. scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") ............ about 70 lines Parquet Log ............. ``` **After** ```scala scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. scala> spark.range(10).write.format("parquet").mode("overwrite").save("/tmp/p") ``` This PR also fixes some typos. ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongj...@apache.org> Closes #13918 from dongjoon-hyun/SPARK-16221. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0da854f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0da854f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0da854f Branch: refs/heads/master Commit: a0da854fb3748aca0128377f0955600cb7a2b5bc Parents: 50fdd86 Author: Dongjoon Hyun <dongj...@apache.org> Authored: Tue Jun 28 13:01:18 2016 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Tue Jun 28 13:01:18 2016 +0800 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFileFormat.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a0da854f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2cce3db..80002d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -129,6 +129,8 @@ private[sql] class ParquetFileFormat conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) } + ParquetFileFormat.redirectParquetLogs() + new OutputWriterFactory { override def newInstance( path: String, @@ -468,9 +470,9 @@ private[sql] class ParquetOutputWriterFactory( override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter { // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter - private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) + private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) private val hadoopAttemptContext = new TaskAttemptContextImpl( - serializableConf.value, hadoopTaskAttempId) + serializableConf.value, hadoopTaskAttemptId) // Instance of ParquetRecordWriter that does not use OutputCommitter private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) @@ -505,7 +507,7 @@ private[sql] class ParquetOutputWriterFactory( dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { throw new UnsupportedOperationException( - "this verison of newInstance not supported for " + + "this version of newInstance not supported for " + "ParquetOutputWriterFactory") } } @@ -665,7 +667,7 @@ private[sql] object ParquetFileFormat extends Logging { Some(Try(DataType.fromJson(serializedSchema.get)) .recover { case _: Throwable => logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + "falling back to the deprecated DataType.fromCaseClassString parser.") LegacyTypeStringParser.parse(serializedSchema.get) } @@ -880,7 +882,7 @@ private[sql] object ParquetFileFormat extends Logging { Try(DataType.fromJson(schemaString).asInstanceOf[StructType]).recover { case _: Throwable => logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + "falling back to the deprecated DataType.fromCaseClassString parser.") LegacyTypeStringParser.parse(schemaString).asInstanceOf[StructType] }.recoverWith { @@ -926,4 +928,9 @@ private[sql] object ParquetFileFormat extends Logging { // should be removed after this issue is fixed. } } + + /** + * ParquetFileFormat.prepareWrite calls this function to initialize `redirectParquetLogsViaSLF4J`. + */ + def redirectParquetLogs(): Unit = {} } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org