This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 461a090  Removing job.diagnostics.appender.class config, and 
populating it based on if log4j or log4j2 is class-loaded
461a090 is described below

commit 461a090273667322da500829538a2e79dd619e96
Author: Ray Matharu <[email protected]>
AuthorDate: Thu Apr 18 18:24:31 2019 -0700

    Removing job.diagnostics.appender.class config, and populating it based on 
if log4j or log4j2 is class-loaded
    
    Removing job.diagnostics.appender.class config, and populating it based on 
if log4j or log4j2 is class-loaded, defaults to assuming log4j.
    
    Author: Ray Matharu <[email protected]>
    
    Reviewers: Prateek Maheshwari <[email protected]>
    
    Closes #1003 from rmatharu/simplifying-diagnostics-config
---
 .../main/scala/org/apache/samza/config/JobConfig.scala  |  9 ---------
 .../org/apache/samza/container/SamzaContainer.scala     | 15 ++++++++++-----
 .../src/main/scala/org/apache/samza/util/Util.scala     | 17 +++++++++++++++++
 3 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 64235cf..8713ec1 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -113,11 +113,6 @@ object JobConfig {
   // Enables standby tasks
   val STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor"
   val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1
-
-  // Specify DiagnosticAppender class
-  val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
-  val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = 
"org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
-
   val SYSTEM_STREAM_PARTITION_MAPPER_FACTORY = 
"job.system.stream.partition.mapper.factory"
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
@@ -261,10 +256,6 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, 
false) }
 
-  def getDiagnosticsAppenderClass = {
-    getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, 
JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS)
-  }
-
   def getJMXEnabled = {
     getBoolean(JobConfig.JOB_JMX_ENABLED, true);
   }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 307ee35..bf25975 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -888,14 +888,19 @@ class SamzaContainer(
       info("Starting diagnostics.")
 
       try {
-        val diagnosticsAppender = 
Class.forName(config.getDiagnosticsAppenderClass).
-          
getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
+        var diagnosticsAppender = 
Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender", 
(classOf[SamzaContainerMetrics], this.metrics))
+        info("Attached log4j diagnostics appender.")
       }
       catch {
         case e@(_: ClassNotFoundException | _: InstantiationException | _: 
InvocationTargetException) => {
-          error("Failed to instantiate diagnostic appender", e)
-          throw new ConfigException("Failed to instantiate diagnostic appender 
class " +
-            config.getDiagnosticsAppenderClass, e)
+          try {
+            val diagnosticsAppender = 
Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender", 
(classOf[SamzaContainerMetrics], this.metrics))
+            info("Attached log4j2 diagnostics appender.")
+          } catch {
+            case e@(_: ClassNotFoundException | _: InstantiationException | _: 
InvocationTargetException) => {
+              warn("Failed to instantiate neither diagnostic appender for 
sending error information to diagnostics stream", e)
+            }
+          }
         }
       }
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 24ee476..1bb6648 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -20,6 +20,8 @@
 package org.apache.samza.util
 
 
+import java.lang.reflect.InvocationTargetException
+
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config._
 import org.apache.samza.SamzaException
@@ -63,6 +65,21 @@ object Util extends Logging {
   }
 
   /**
+    * Instantiate an object from given className, and given constructor 
parameters.
+    */
+  def getObj[T](className: String, constructorParams: (Class[_], Object)*) : T 
= {
+    try {
+      Class.forName(className).getDeclaredConstructor(constructorParams.map(x 
=> x._1): _*)
+        .newInstance(constructorParams.map(x => x._2): _*).asInstanceOf[T]
+    } catch {
+      case e@(_: ClassNotFoundException | _: InstantiationException | _: 
InvocationTargetException) => {
+        warn("Could not instantiate an instance for class %s." format 
className, e)
+        throw e
+      }
+    }
+  }
+
+  /**
    * Returns the the first host address which is not the loopback address, or 
[[java.net.InetAddress#getLocalHost]] as a fallback
    *
    * @return the [[java.net.InetAddress]] which represents the localhost

Reply via email to