Repository: spark
Updated Branches:
  refs/heads/branch-1.5 42a0b4890 -> a512250cd


[SPARK-8118] [SQL] Redirects Parquet JUL logger via SLF4J

Parquet hard coded a JUL logger which always writes to stdout. This PR 
redirects it via SLF4j JUL bridge handler, so that we can control Parquet logs 
via `log4j.properties`.

This solution is inspired by 
https://github.com/Parquet/parquet-mr/issues/390#issuecomment-46064909.

Author: Cheng Lian <l...@databricks.com>

Closes #8196 from liancheng/spark-8118/redirect-parquet-jul.

(cherry picked from commit 5723d26d7e677b89383de3fcf2c9a821b68a65b7)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a512250c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a512250c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a512250c

Branch: refs/heads/branch-1.5
Commit: a512250cd19288a7ad8fb600d06544f8728b2dd1
Parents: 42a0b48
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Aug 18 20:15:33 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Aug 18 20:16:13 2015 +0800

----------------------------------------------------------------------
 conf/log4j.properties.template                  |  2 +
 .../datasources/parquet/ParquetRelation.scala   | 77 ++++++++++----------
 .../parquet/ParquetTableSupport.scala           |  1 -
 .../parquet/ParquetTypesConverter.scala         |  3 -
 sql/hive/src/test/resources/log4j.properties    |  7 +-
 5 files changed, 47 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a512250c/conf/log4j.properties.template
----------------------------------------------------------------------
diff --git a/conf/log4j.properties.template b/conf/log4j.properties.template
index 27006e4..74c5cea 100644
--- a/conf/log4j.properties.template
+++ b/conf/log4j.properties.template
@@ -10,6 +10,8 @@ log4j.logger.org.spark-project.jetty=WARN
 log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.parquet=ERROR
+log4j.logger.parquet=ERROR
 
 # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent 
UDFs in SparkSQL with Hive support
 log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL

http://git-wip-us.apache.org/repos/asf/spark/blob/a512250c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 52fac18..68169d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.datasources.parquet
 
 import java.net.URI
-import java.util.logging.{Level, Logger => JLogger}
+import java.util.logging.{Logger => JLogger}
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
@@ -31,22 +31,22 @@ import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
 import org.apache.parquet.schema.MessageType
-import org.apache.parquet.{Log => ParquetLog}
+import org.apache.parquet.{Log => ApacheParquetLog}
+import org.slf4j.bridge.SLF4JBridgeHandler
 
-import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
-import org.apache.spark.rdd.RDD._
+import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
@@ -759,38 +759,39 @@ private[sql] object ParquetRelation extends Logging {
     }.toOption
   }
 
-  def enableLogForwarding() {
-    // Note: the org.apache.parquet.Log class has a static initializer that
-    // sets the java.util.logging Logger for "org.apache.parquet". This
-    // checks first to see if there's any handlers already set
-    // and if not it creates them. If this method executes prior
-    // to that class being loaded then:
-    //  1) there's no handlers installed so there's none to
-    // remove. But when it IS finally loaded the desired affect
-    // of removing them is circumvented.
-    //  2) The parquet.Log static initializer calls setUseParentHandlers(false)
-    // undoing the attempt to override the logging here.
-    //
-    // Therefore we need to force the class to be loaded.
-    // This should really be resolved by Parquet.
-    Utils.classForName(classOf[ParquetLog].getName)
-
-    // Note: Logger.getLogger("parquet") has a default logger
-    // that appends to Console which needs to be cleared.
-    val parquetLogger = 
JLogger.getLogger(classOf[ParquetLog].getPackage.getName)
-    parquetLogger.getHandlers.foreach(parquetLogger.removeHandler)
-    parquetLogger.setUseParentHandlers(true)
-
-    // Disables a WARN log message in ParquetOutputCommitter.  We first ensure 
that
-    // ParquetOutputCommitter is loaded and the static LOG field gets 
initialized.
-    // See https://issues.apache.org/jira/browse/SPARK-5968 for details
-    Utils.classForName(classOf[ParquetOutputCommitter].getName)
-    
JLogger.getLogger(classOf[ParquetOutputCommitter].getName).setLevel(Level.OFF)
-
-    // Similar as above, disables a unnecessary WARN log message in 
ParquetRecordReader.
-    // See https://issues.apache.org/jira/browse/PARQUET-220 for details
-    Utils.classForName(classOf[ParquetRecordReader[_]].getName)
-    
JLogger.getLogger(classOf[ParquetRecordReader[_]].getName).setLevel(Level.OFF)
+  // JUL loggers must be held by a strong reference, otherwise they may get 
destroyed by GC.
+  // However, the root JUL logger used by Parquet isn't properly referenced.  
Here we keep
+  // references to loggers in both parquet-mr <= 1.6 and >= 1.7
+  val apacheParquetLogger: JLogger = 
JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName)
+  val parquetLogger: JLogger = JLogger.getLogger("parquet")
+
+  // Parquet initializes its own JUL logger in a static block which always 
prints to stdout.  Here
+  // we redirect the JUL logger via SLF4J JUL bridge handler.
+  val redirectParquetLogsViaSLF4J: Unit = {
+    def redirect(logger: JLogger): Unit = {
+      logger.getHandlers.foreach(logger.removeHandler)
+      logger.setUseParentHandlers(false)
+      logger.addHandler(new SLF4JBridgeHandler)
+    }
+
+    // For parquet-mr 1.7.0 and above versions, which are under 
`org.apache.parquet` namespace.
+    // scalastyle:off classforname
+    Class.forName(classOf[ApacheParquetLog].getName)
+    // scalastyle:on classforname
+    redirect(JLogger.getLogger(classOf[ApacheParquetLog].getPackage.getName))
+
+    // For parquet-mr 1.6.0 and lower versions bundled with Hive, which are 
under `parquet`
+    // namespace.
+    try {
+      // scalastyle:off classforname
+      Class.forName("parquet.Log")
+      // scalastyle:on classforname
+      redirect(JLogger.getLogger("parquet"))
+    } catch { case _: Throwable =>
+      // SPARK-9974: com.twitter:parquet-hadoop-bundle:1.6.0 is not packaged 
into the assembly jar
+      // when Spark is built with SBT. So `parquet.Log` may not be found.  
This try/catch block
+      // should be removed after this issue is fixed.
+    }
   }
 
   // The parquet compression short names

http://git-wip-us.apache.org/repos/asf/spark/blob/a512250c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
index 3191cf3..ed89aa2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTableSupport.scala
@@ -52,7 +52,6 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[InternalRow] with Lo
     }
 
     log.debug(s"write support initialized for requested schema $attributes")
-    ParquetRelation.enableLogForwarding()
     new 
WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes),
 metadata)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a512250c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
index 019db34..42376ef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala
@@ -104,7 +104,6 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
       extraMetadata,
       "Spark")
 
-    ParquetRelation.enableLogForwarding()
     ParquetFileWriter.writeMetadataFile(
       conf,
       path,
@@ -140,8 +139,6 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
           (name(0) == '.' || name(0) == '_') && name != 
ParquetFileWriter.PARQUET_METADATA_FILE
         }
 
-    ParquetRelation.enableLogForwarding()
-
     // NOTE (lian): Parquet "_metadata" file can be very slow if the file 
consists of lots of row
     // groups. Since Parquet schema is replicated among all row groups, we 
only need to touch a
     // single row group to read schema related metadata. Notice that we are 
making assumptions that

http://git-wip-us.apache.org/repos/asf/spark/blob/a512250c/sql/hive/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/log4j.properties 
b/sql/hive/src/test/resources/log4j.properties
index 92eaf1f..fea3404 100644
--- a/sql/hive/src/test/resources/log4j.properties
+++ b/sql/hive/src/test/resources/log4j.properties
@@ -48,9 +48,14 @@ log4j.logger.hive.log=OFF
 log4j.additivity.parquet.hadoop.ParquetRecordReader=false
 log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
 
+log4j.additivity.org.apache.parquet.hadoop.ParquetRecordReader=false
+log4j.logger.org.apache.parquet.hadoop.ParquetRecordReader=OFF
+
+log4j.additivity.org.apache.parquet.hadoop.ParquetOutputCommitter=false
+log4j.logger.org.apache.parquet.hadoop.ParquetOutputCommitter=OFF
+
 log4j.additivity.hive.ql.metadata.Hive=false
 log4j.logger.hive.ql.metadata.Hive=OFF
 
 log4j.additivity.org.apache.hadoop.hive.ql.io.RCFile=false
 log4j.logger.org.apache.hadoop.hive.ql.io.RCFile=ERROR
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to