This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 461a090 Removing job.diagnostics.appender.class config, and
populating it based on if log4j or log4j2 is class-loaded
461a090 is described below
commit 461a090273667322da500829538a2e79dd619e96
Author: Ray Matharu <[email protected]>
AuthorDate: Thu Apr 18 18:24:31 2019 -0700
Removing job.diagnostics.appender.class config, and populating it based on
if log4j or log4j2 is class-loaded
Removing job.diagnostics.appender.class config, and populating it based on
if log4j or log4j2 is class-loaded, defaults to assuming log4j.
Author: Ray Matharu <[email protected]>
Reviewers: Prateek Maheshwari <[email protected]>
Closes #1003 from rmatharu/simplifying-diagnostics-config
---
.../main/scala/org/apache/samza/config/JobConfig.scala | 9 ---------
.../org/apache/samza/container/SamzaContainer.scala | 15 ++++++++++-----
.../src/main/scala/org/apache/samza/util/Util.scala | 17 +++++++++++++++++
3 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 64235cf..8713ec1 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -113,11 +113,6 @@ object JobConfig {
// Enables standby tasks
val STANDBY_TASKS_REPLICATION_FACTOR = "job.standbytasks.replication.factor"
val DEFAULT_STANDBY_TASKS_REPLICATION_FACTOR = 1
-
- // Specify DiagnosticAppender class
- val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
- val DEFAULT_DIAGNOSTICS_APPENDER_CLASS =
"org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
-
val SYSTEM_STREAM_PARTITION_MAPPER_FACTORY =
"job.system.stream.partition.mapper.factory"
implicit def Config2Job(config: Config) = new JobConfig(config)
@@ -261,10 +256,6 @@ class JobConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED,
false) }
- def getDiagnosticsAppenderClass = {
- getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS,
JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS)
- }
-
def getJMXEnabled = {
getBoolean(JobConfig.JOB_JMX_ENABLED, true);
}
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 307ee35..bf25975 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -888,14 +888,19 @@ class SamzaContainer(
info("Starting diagnostics.")
try {
- val diagnosticsAppender =
Class.forName(config.getDiagnosticsAppenderClass).
-
getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
+ var diagnosticsAppender =
Util.getObj("org.apache.samza.logging.log4j.SimpleDiagnosticsAppender",
(classOf[SamzaContainerMetrics], this.metrics))
+ info("Attached log4j diagnostics appender.")
}
catch {
case e@(_: ClassNotFoundException | _: InstantiationException | _:
InvocationTargetException) => {
- error("Failed to instantiate diagnostic appender", e)
- throw new ConfigException("Failed to instantiate diagnostic appender
class " +
- config.getDiagnosticsAppenderClass, e)
+ try {
+ val diagnosticsAppender =
Util.getObj("org.apache.samza.logging.log4j2.SimpleDiagnosticsAppender",
(classOf[SamzaContainerMetrics], this.metrics))
+ info("Attached log4j2 diagnostics appender.")
+ } catch {
+ case e@(_: ClassNotFoundException | _: InstantiationException | _:
InvocationTargetException) => {
+ warn("Failed to instantiate neither diagnostic appender for
sending error information to diagnostics stream", e)
+ }
+ }
}
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 24ee476..1bb6648 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -20,6 +20,8 @@
package org.apache.samza.util
+import java.lang.reflect.InvocationTargetException
+
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config._
import org.apache.samza.SamzaException
@@ -63,6 +65,21 @@ object Util extends Logging {
}
/**
+ * Instantiate an object from given className, and given constructor
parameters.
+ */
+ def getObj[T](className: String, constructorParams: (Class[_], Object)*) : T
= {
+ try {
+ Class.forName(className).getDeclaredConstructor(constructorParams.map(x
=> x._1): _*)
+ .newInstance(constructorParams.map(x => x._2): _*).asInstanceOf[T]
+ } catch {
+ case e@(_: ClassNotFoundException | _: InstantiationException | _:
InvocationTargetException) => {
+ warn("Could not instantiate an instance for class %s." format
className, e)
+ throw e
+ }
+ }
+ }
+
+ /**
* Returns the the first host address which is not the loopback address, or
[[java.net.InetAddress#getLocalHost]] as a fallback
*
* @return the [[java.net.InetAddress]] which represents the localhost