Repository: samza
Updated Branches:
  refs/heads/master 058776d65 -> 6d20ee7e4


SAMZA-1951: Make JMX enabled/disabled using configs for Samza containers

Author: Sanil15 <sanil.jai...@gmail.com>
Author: svenkata <svenk...@linkedin.com>

Reviewers: Jagadish<jagad...@apache.org>

Closes #717 from Sanil15/SAMZA-1951


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6d20ee7e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6d20ee7e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6d20ee7e

Branch: refs/heads/master
Commit: 6d20ee7e4c4c6797fee3002102309b2ab2bec46e
Parents: 058776d
Author: Sanil15 <sanil.jai...@gmail.com>
Authored: Wed Oct 17 12:02:40 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Wed Oct 17 12:02:40 2018 -0700

----------------------------------------------------------------------
 .../versioned/jobs/samza-configurations.md      |  3 +-
 .../ClusterBasedJobCoordinator.java             |  2 +-
 .../samza/config/ClusterManagerConfig.java      | 29 ++++++++++++++------
 .../org/apache/samza/config/JobConfig.scala     |  5 ++++
 .../apache/samza/container/SamzaContainer.scala | 10 ++++---
 .../samza/job/local/ThreadJobFactory.scala      | 11 ++++++--
 6 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/docs/learn/documentation/versioned/jobs/samza-configurations.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 0928ee2..5828969 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -55,6 +55,7 @@ These are the basic properties for setting up a Samza 
application.
 |job.default.system| |__Required:__ The system-name to use for creating input 
or output streams for which the system is not explicitly configured. This 
property will also be used as default for `job.coordinator.system`, 
`task.checkpoint.system` and `job.changelog.system` if none are defined.|
 |task.class| |Used for legacy purposes; replace with `app.class` in new jobs. 
The fully-qualified name of the Java class which processes incoming messages 
from input streams. The class must implement 
[StreamTask](../api/javadocs/org/apache/samza/task/StreamTask.html) or 
[AsyncStreamTask](../api/javadocs/org/apache/samza/task/AsyncStreamTask.html), 
and may optionally implement 
[InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html), 
[ClosableTask](../api/javadocs/org/apache/samza/task/ClosableTask.html) and/or 
[WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html). 
The class will be instantiated several times, once for every input stream 
partition.|
 |job.host-affinity.enabled|false|This property indicates whether host-affinity 
is enabled or not. Host-affinity refers to the ability of Samza to request and 
allocate a container on the same host every time the job is deployed. When 
host-affinity is enabled, Samza makes a "best-effort" to honor the 
host-affinity constraint. The property 
`cluster-manager.container.request.timeout.ms` determines how long to wait 
before de-prioritizing the host-affinity constraint and assigning the container 
to any available resource.|
+|job.jmx.enabled|true|Determines whether a JMX server should be started on the 
job's JobCoordinator and Container. (true or false).|
 |task.window.ms|-1|If task.class implements 
[WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html), it 
can receive a windowing callback in regular intervals. This property specifies 
the time between window() calls, in milliseconds. If the number is negative 
(the default), window() is never called. A `window()` call will never  occur 
concurrently with the processing of a message. If a message is being processed 
when a window() call is due, the invocation of window happens after processing 
the message. This property is set automatically when using join or window 
operators in a High Level API StreamApplication Note: task.window.ms should be 
set to be much larger than average process or window call duration to avoid 
starving regular processing.|
 |task.log4j.system| |Specify the system name for the StreamAppender. If this 
property is not specified in the config, an exception will be thrown. (See 
[Stream Log4j Appender](logging.html#stream-log4j-appender)) Example: 
task.log4j.system=kafka|
 |serializers.registry.<br>**_serde-name_**.class| |Use this property to 
register a serializer/deserializer, which defines a way of encoding data as an 
array of bytes (used for messages in streams, and for data in persistent 
storage). You can give a serde any serde-name you want, and reference that name 
in properties like systems.\*.samza.key.serde, systems.\*.samza.msg.serde, 
streams.\*.samza.key.serde, streams.\*.samza.msg.serde, stores.\*.key.serde and 
stores.\*.msg.serde. The value of this property is the fully-qualified name of 
a Java class that implements SerdeFactory. Samza ships with the following serde 
implementations:<br><br>`org.apache.samza.serializers.ByteSerdeFactory`<br>A 
no-op serde which passes through the undecoded byte array. 
<br><br>`org.apache.samza.serializers.ByteBufferSerdeFactory`<br>Encodes 
`java.nio.ByteBuffer` objects. 
<br><br>`org.apache.samza.serializers.IntegerSerdeFactory`<br>Encodes 
`java.lang.Integer` objects as binary (4 bytes fixed-length big-endia
 n 
encoding).<br><br>`org.apache.samza.serializers.StringSerdeFactory`<br>Encodes 
`java.lang.String` objects as UTF-8. 
<br><br>`org.apache.samza.serializers.JsonSerdeFactory`<br>Encodes nested 
structures of `java.util.Map`, `java.util.List` etc. as JSON. Note: This Serde 
enforces a dash-separated property naming convention, while JsonSerdeV2 
doesn't. This serde is primarily meant for Samza's internal usage, and is 
publicly available for backwards 
compatibility.<br><br>`org.apache.samza.serializers.JsonSerdeV2Factory`<br>Encodes
 nested structures of `java.util.Map`, `java.util.List` etc. as JSON. Note: 
This Serde uses Jackson's default (camelCase) property naming convention. This 
serde should be preferred over JsonSerde, especially in High Level API, unless 
the dasherized naming convention is required (e.g., for backwards 
compatibility).<br><br>`org.apache.samza.serializers.LongSerdeFactory`<br>Encodes
 `java.lang.Long` as binary (8 bytes fixed-length big-endian 
encoding).<br><br>`org.
 apache.samza.serializers.DoubleSerdeFactory`<br>Encodes `java.lang.Double` as 
binary (8 bytes double-precision float point). 
<br><br>`org.apache.samza.serializers.UUIDSerdeFactory`<br>Encodes 
`java.util.UUID` 
objects.<br><br>`org.apache.samza.serializers.SerializableSerdeFactory`<br>Encodes
 `java.io.Serializable` 
objects.<br><br>`org.apache.samza.serializers.MetricsSnapshotSerdeFactory`<br>Encodes
 `org.apache.samza.metrics.reporter.MetricsSnapshot` objects (which are used 
for reporting metrics) as 
JSON.<br><br>`org.apache.samza.serializers.KafkaSerdeFactory`<br>Adapter which 
allows existing `kafka.serializer.Encoder` and `kafka.serializer.Decoder` 
implementations to be used as Samza serdes. Set 
`serializers.registry.serde-name.encoder` and  
`serializers.registry.serde-name.decoder` to the appropriate class names.|
@@ -289,7 +290,7 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |--- |--- |--- |
 |cluster-manager.container.retry.count|8|If a container fails, it is 
automatically restarted by Samza. However, if a container keeps failing shortly 
after startup, that indicates a deeper problem, so we should kill the job 
rather than retrying indefinitely. This property determines the maximum number 
of times we are willing to restart a failed container in quick succession (the 
time period is configured with `cluster-manager.container.retry.window.ms`). 
Each container in the job is counted separately. If this property is set to 0, 
any failed container immediately causes the whole job to fail. If it is set to 
a negative number, there is no limit on the number of retries.|
 |cluster-manager.container.retry.window.ms|300000|This property determines how 
frequently a container is allowed to fail before we give up and fail the job. 
If the same container has failed more than 
`cluster-manager.container.retry.count` times, and the time between failures 
was less than this property `cluster-manager.container.retry.window.ms` (in 
milliseconds), then we fail the job. There is no limit to the number of times 
we will restart a container if the time between failures is greater than 
`cluster-manager.container.retry.window.ms`.|
-|cluster-manager.jobcoordinator.jmx.enabled|true|Determines whether a JMX 
server should be started on the job's JobCoordinator. (true or false).|
+|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor 
of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is 
responsible for matching requests to allocated containers. The sleep interval 
for this thread is configured using this property.|
 |cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
This property determines the number of milliseconds before a container request 
is considered to have expired / timed-out. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager.|
 |task.execute|bin/run-container.sh|The command that starts a Samza container. 
The script must be included in the [job package](./packaging.html). There is 
usually no need to customize this.|

http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 12e26f7..ff70df0 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -183,7 +183,7 @@ public class ClusterBasedJobCoordinator {
     systemAdmins = new SystemAdmins(config);
     partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
     clusterManagerConfig = new ClusterManagerConfig(config);
-    isJmxEnabled = clusterManagerConfig.getJmxEnabled();
+    isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
 
     jobCoordinatorSleepInterval = 
clusterManagerConfig.getJobCoordinatorSleepInterval();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index cb5d5c0..cb86a58 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -86,19 +86,27 @@ public class ClusterManagerConfig extends MapConfig {
   private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8;
 
   /**
-   * Determines whether a JMX server should be started on the job coordinator
-   * Default: true
-   */
-  public static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
-  public static final String CLUSTER_MANAGER_JMX_ENABLED = 
"cluster-manager.jobcoordinator.jmx.enabled";
-
-  /**
    * The cluster managed job coordinator sleeps for a configurable time before 
checking again for termination.
    * The sleep interval of the cluster managed job coordinator.
    */
   public static final String CLUSTER_MANAGER_SLEEP_MS = 
"cluster-manager.jobcoordinator.sleep.interval.ms";
   private static final int DEFAULT_CLUSTER_MANAGER_SLEEP_MS = 1000;
 
+  /**
+   * Determines whether a JMX server should be started on JobCoordinator and 
SamzaContainer
+   * Default: true
+   */
+  private static final String JOB_JMX_ENABLED = "job.jmx.enabled";
+
+  /**
+   * Determines whether a JMX server should be started on the job coordinator
+   * Default: true
+   *
+   * @deprecated use {@code JOB_JMX_ENABLED} instead
+   */
+  private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled";
+  private static final String CLUSTER_MANAGER_JMX_ENABLED = 
"cluster-manager.jobcoordinator.jmx.enabled";
+
   public ClusterManagerConfig(Config config) {
       super(config);
   }
@@ -189,12 +197,15 @@ public class ClusterManagerConfig extends MapConfig {
     return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT);
   }
 
-  public boolean getJmxEnabled() {
+  public boolean getJmxEnabledOnJobCoordinator() {
     if (containsKey(CLUSTER_MANAGER_JMX_ENABLED)) {
+      log.warn("Configuration {} is deprecated. Please use {}", 
CLUSTER_MANAGER_JMX_ENABLED, JOB_JMX_ENABLED);
       return getBoolean(CLUSTER_MANAGER_JMX_ENABLED);
     } else if (containsKey(AM_JMX_ENABLED)) {
-      log.info("Configuration {} is deprecated. Please use {}", 
AM_JMX_ENABLED, CLUSTER_MANAGER_JMX_ENABLED);
+      log.warn("Configuration {} is deprecated. Please use {}", 
AM_JMX_ENABLED, JOB_JMX_ENABLED);
       return getBoolean(AM_JMX_ENABLED);
+    } else if (containsKey(JOB_JMX_ENABLED)) {
+      return getBoolean(JOB_JMX_ENABLED);
     } else {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 4f19ade..2bc6420 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -45,6 +45,7 @@ object JobConfig {
   val SAMZA_FWK_VERSION = "samza.fwk.version"
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
   val JOB_DEFAULT_SYSTEM = "job.default.system"
+  val JOB_JMX_ENABLED = "job.jmx.enabled"
   val JOB_CONTAINER_COUNT = "job.container.count"
   val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -208,4 +209,8 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   def getDiagnosticsAppenderClass = {
     getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, 
JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS)
   }
+
+  def getJMXEnabled = {
+    getBoolean(JobConfig.JOB_JMX_ENABLED, true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 3292986..87ab86b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -800,8 +800,9 @@ class SamzaContainer(
 
       val startTime = System.nanoTime()
       status = SamzaContainerStatus.STARTING
-
-      jmxServer = new JmxServer()
+      if (config.getJMXEnabled) {
+        jmxServer = new JmxServer()
+      }
       applicationContainerContextOption.foreach(_.start)
 
       startMetrics
@@ -840,8 +841,9 @@ class SamzaContainer(
     try {
       info("Shutting down SamzaContainer.")
       removeShutdownHook
-
-      jmxServer.stop
+      if (jmxServer != null) {
+        jmxServer.stop
+      }
 
       shutdownConsumers
       shutdownTask

http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index ee6aff3..5a8d2f8 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -23,7 +23,7 @@ import org.apache.samza.application.ApplicationUtil
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, TaskConfigJava}
+import org.apache.samza.config.{Config, JobConfig, TaskConfigJava}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, 
TaskName}
 import org.apache.samza.context.JobContextImpl
 import org.apache.samza.coordinator.JobModelManager
@@ -74,7 +74,10 @@ class ThreadJobFactory extends StreamJobFactory with Logging 
{
     ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, 
jobModel.maxChangeLogStreamPartitions)
 
     val containerId = "0"
-    val jmxServer = new JmxServer
+    var jmxServer: JmxServer = null
+    if (new JobConfig(config).getJMXEnabled) {
+      jmxServer = new JmxServer();
+    }
 
     val appDesc = 
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), 
config)
     val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc)
@@ -127,7 +130,9 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
     } finally {
       coordinator.stop
       coordinatorStreamManager.stop()
-      jmxServer.stop
+      if (jmxServer != null) {
+        jmxServer.stop
+      }
     }
   }
 }

Reply via email to