Repository: samza Updated Branches: refs/heads/master 058776d65 -> 6d20ee7e4
SAMZA-1951: Make JMX enabled/disabled using configs for Samza containers Author: Sanil15 <sanil.jai...@gmail.com> Author: svenkata <svenk...@linkedin.com> Reviewers: Jagadish<jagad...@apache.org> Closes #717 from Sanil15/SAMZA-1951 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/6d20ee7e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/6d20ee7e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/6d20ee7e Branch: refs/heads/master Commit: 6d20ee7e4c4c6797fee3002102309b2ab2bec46e Parents: 058776d Author: Sanil15 <sanil.jai...@gmail.com> Authored: Wed Oct 17 12:02:40 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Wed Oct 17 12:02:40 2018 -0700 ---------------------------------------------------------------------- .../versioned/jobs/samza-configurations.md | 3 +- .../ClusterBasedJobCoordinator.java | 2 +- .../samza/config/ClusterManagerConfig.java | 29 ++++++++++++++------ .../org/apache/samza/config/JobConfig.scala | 5 ++++ .../apache/samza/container/SamzaContainer.scala | 10 ++++--- .../samza/job/local/ThreadJobFactory.scala | 11 ++++++-- 6 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/docs/learn/documentation/versioned/jobs/samza-configurations.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 0928ee2..5828969 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -55,6 +55,7 @@ These are the basic properties for setting up a Samza application. |job.default.system| |__Required:__ The system-name to use for creating input or output streams for which the system is not explicitly configured. This property will also be used as default for `job.coordinator.system`, `task.checkpoint.system` and `job.changelog.system` if none are defined.| |task.class| |Used for legacy purposes; replace with `app.class` in new jobs. The fully-qualified name of the Java class which processes incoming messages from input streams. The class must implement [StreamTask](../api/javadocs/org/apache/samza/task/StreamTask.html) or [AsyncStreamTask](../api/javadocs/org/apache/samza/task/AsyncStreamTask.html), and may optionally implement [InitableTask](../api/javadocs/org/apache/samza/task/InitableTask.html), [ClosableTask](../api/javadocs/org/apache/samza/task/ClosableTask.html) and/or [WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html). The class will be instantiated several times, once for every input stream partition.| |job.host-affinity.enabled|false|This property indicates whether host-affinity is enabled or not. Host-affinity refers to the ability of Samza to request and allocate a container on the same host every time the job is deployed. When host-affinity is enabled, Samza makes a "best-effort" to honor the host-affinity constraint. The property `cluster-manager.container.request.timeout.ms` determines how long to wait before de-prioritizing the host-affinity constraint and assigning the container to any available resource.| +|job.jmx.enabled|true|Determines whether a JMX server should be started on the job's JobCoordinator and Container. (true or false).| |task.window.ms|-1|If task.class implements [WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html), it can receive a windowing callback in regular intervals. This property specifies the time between window() calls, in milliseconds. If the number is negative (the default), window() is never called. A `window()` call will never occur concurrently with the processing of a message. If a message is being processed when a window() call is due, the invocation of window happens after processing the message. This property is set automatically when using join or window operators in a High Level API StreamApplication Note: task.window.ms should be set to be much larger than average process or window call duration to avoid starving regular processing.| |task.log4j.system| |Specify the system name for the StreamAppender. If this property is not specified in the config, an exception will be thrown. (See [Stream Log4j Appender](logging.html#stream-log4j-appender)) Example: task.log4j.system=kafka| |serializers.registry.<br>**_serde-name_**.class| |Use this property to register a serializer/deserializer, which defines a way of encoding data as an array of bytes (used for messages in streams, and for data in persistent storage). You can give a serde any serde-name you want, and reference that name in properties like systems.\*.samza.key.serde, systems.\*.samza.msg.serde, streams.\*.samza.key.serde, streams.\*.samza.msg.serde, stores.\*.key.serde and stores.\*.msg.serde. The value of this property is the fully-qualified name of a Java class that implements SerdeFactory. Samza ships with the following serde implementations:<br><br>`org.apache.samza.serializers.ByteSerdeFactory`<br>A no-op serde which passes through the undecoded byte array. <br><br>`org.apache.samza.serializers.ByteBufferSerdeFactory`<br>Encodes `java.nio.ByteBuffer` objects. <br><br>`org.apache.samza.serializers.IntegerSerdeFactory`<br>Encodes `java.lang.Integer` objects as binary (4 bytes fixed-length big-endia n encoding).<br><br>`org.apache.samza.serializers.StringSerdeFactory`<br>Encodes `java.lang.String` objects as UTF-8. <br><br>`org.apache.samza.serializers.JsonSerdeFactory`<br>Encodes nested structures of `java.util.Map`, `java.util.List` etc. as JSON. Note: This Serde enforces a dash-separated property naming convention, while JsonSerdeV2 doesn't. This serde is primarily meant for Samza's internal usage, and is publicly available for backwards compatibility.<br><br>`org.apache.samza.serializers.JsonSerdeV2Factory`<br>Encodes nested structures of `java.util.Map`, `java.util.List` etc. as JSON. Note: This Serde uses Jackson's default (camelCase) property naming convention. This serde should be preferred over JsonSerde, especially in High Level API, unless the dasherized naming convention is required (e.g., for backwards compatibility).<br><br>`org.apache.samza.serializers.LongSerdeFactory`<br>Encodes `java.lang.Long` as binary (8 bytes fixed-length big-endian encoding).<br><br>`org. apache.samza.serializers.DoubleSerdeFactory`<br>Encodes `java.lang.Double` as binary (8 bytes double-precision float point). <br><br>`org.apache.samza.serializers.UUIDSerdeFactory`<br>Encodes `java.util.UUID` objects.<br><br>`org.apache.samza.serializers.SerializableSerdeFactory`<br>Encodes `java.io.Serializable` objects.<br><br>`org.apache.samza.serializers.MetricsSnapshotSerdeFactory`<br>Encodes `org.apache.samza.metrics.reporter.MetricsSnapshot` objects (which are used for reporting metrics) as JSON.<br><br>`org.apache.samza.serializers.KafkaSerdeFactory`<br>Adapter which allows existing `kafka.serializer.Encoder` and `kafka.serializer.Decoder` implementations to be used as Samza serdes. Set `serializers.registry.serde-name.encoder` and `serializers.registry.serde-name.decoder` to the appropriate class names.| @@ -289,7 +290,7 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen |--- |--- |--- | |cluster-manager.container.retry.count|8|If a container fails, it is automatically restarted by Samza. However, if a container keeps failing shortly after startup, that indicates a deeper problem, so we should kill the job rather than retrying indefinitely. This property determines the maximum number of times we are willing to restart a failed container in quick succession (the time period is configured with `cluster-manager.container.retry.window.ms`). Each container in the job is counted separately. If this property is set to 0, any failed container immediately causes the whole job to fail. If it is set to a negative number, there is no limit on the number of retries.| |cluster-manager.container.retry.window.ms|300000|This property determines how frequently a container is allowed to fail before we give up and fail the job. If the same container has failed more than `cluster-manager.container.retry.count` times, and the time between failures was less than this property `cluster-manager.container.retry.window.ms` (in milliseconds), then we fail the job. There is no limit to the number of times we will restart a container if the time between failures is greater than `cluster-manager.container.retry.window.ms`.| -|cluster-manager.jobcoordinator.jmx.enabled|true|Determines whether a JMX server should be started on the job's JobCoordinator. (true or false).| +|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`| |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.| |cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.| |task.execute|bin/run-container.sh|The command that starts a Samza container. The script must be included in the [job package](./packaging.html). There is usually no need to customize this.| http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 12e26f7..ff70df0 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -183,7 +183,7 @@ public class ClusterBasedJobCoordinator { systemAdmins = new SystemAdmins(config); partitionMonitor = getPartitionCountMonitor(config, systemAdmins); clusterManagerConfig = new ClusterManagerConfig(config); - isJmxEnabled = clusterManagerConfig.getJmxEnabled(); + isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator(); jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java index cb5d5c0..cb86a58 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -86,19 +86,27 @@ public class ClusterManagerConfig extends MapConfig { private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8; /** - * Determines whether a JMX server should be started on the job coordinator - * Default: true - */ - public static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled"; - public static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled"; - - /** * The cluster managed job coordinator sleeps for a configurable time before checking again for termination. * The sleep interval of the cluster managed job coordinator. */ public static final String CLUSTER_MANAGER_SLEEP_MS = "cluster-manager.jobcoordinator.sleep.interval.ms"; private static final int DEFAULT_CLUSTER_MANAGER_SLEEP_MS = 1000; + /** + * Determines whether a JMX server should be started on JobCoordinator and SamzaContainer + * Default: true + */ + private static final String JOB_JMX_ENABLED = "job.jmx.enabled"; + + /** + * Determines whether a JMX server should be started on the job coordinator + * Default: true + * + * @deprecated use {@code JOB_JMX_ENABLED} instead + */ + private static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled"; + private static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled"; + public ClusterManagerConfig(Config config) { super(config); } @@ -189,12 +197,15 @@ public class ClusterManagerConfig extends MapConfig { return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT); } - public boolean getJmxEnabled() { + public boolean getJmxEnabledOnJobCoordinator() { if (containsKey(CLUSTER_MANAGER_JMX_ENABLED)) { + log.warn("Configuration {} is deprecated. Please use {}", CLUSTER_MANAGER_JMX_ENABLED, JOB_JMX_ENABLED); return getBoolean(CLUSTER_MANAGER_JMX_ENABLED); } else if (containsKey(AM_JMX_ENABLED)) { - log.info("Configuration {} is deprecated. Please use {}", AM_JMX_ENABLED, CLUSTER_MANAGER_JMX_ENABLED); + log.warn("Configuration {} is deprecated. Please use {}", AM_JMX_ENABLED, JOB_JMX_ENABLED); return getBoolean(AM_JMX_ENABLED); + } else if (containsKey(JOB_JMX_ENABLED)) { + return getBoolean(JOB_JMX_ENABLED); } else { return true; } http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 4f19ade..2bc6420 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -45,6 +45,7 @@ object JobConfig { val SAMZA_FWK_VERSION = "samza.fwk.version" val JOB_COORDINATOR_SYSTEM = "job.coordinator.system" val JOB_DEFAULT_SYSTEM = "job.default.system" + val JOB_JMX_ENABLED = "job.jmx.enabled" val JOB_CONTAINER_COUNT = "job.container.count" val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size" val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode" @@ -208,4 +209,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getDiagnosticsAppenderClass = { getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS) } + + def getJMXEnabled = { + getBoolean(JobConfig.JOB_JMX_ENABLED, true); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 3292986..87ab86b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -800,8 +800,9 @@ class SamzaContainer( val startTime = System.nanoTime() status = SamzaContainerStatus.STARTING - - jmxServer = new JmxServer() + if (config.getJMXEnabled) { + jmxServer = new JmxServer() + } applicationContainerContextOption.foreach(_.start) startMetrics @@ -840,8 +841,9 @@ class SamzaContainer( try { info("Shutting down SamzaContainer.") removeShutdownHook - - jmxServer.stop + if (jmxServer != null) { + jmxServer.stop + } shutdownConsumers shutdownTask http://git-wip-us.apache.org/repos/asf/samza/blob/6d20ee7e/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index ee6aff3..5a8d2f8 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -23,7 +23,7 @@ import org.apache.samza.application.ApplicationUtil import org.apache.samza.application.descriptors.ApplicationDescriptorUtil import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ -import org.apache.samza.config.{Config, TaskConfigJava} +import org.apache.samza.config.{Config, JobConfig, TaskConfigJava} import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName} import org.apache.samza.context.JobContextImpl import org.apache.samza.coordinator.JobModelManager @@ -74,7 +74,10 @@ class ThreadJobFactory extends StreamJobFactory with Logging { ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions) val containerId = "0" - val jmxServer = new JmxServer + var jmxServer: JmxServer = null + if (new JobConfig(config).getJMXEnabled) { + jmxServer = new JmxServer(); + } val appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config) val taskFactory: TaskFactory[_] = TaskFactoryUtil.getTaskFactory(appDesc) @@ -127,7 +130,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging { } finally { coordinator.stop coordinatorStreamManager.stop() - jmxServer.stop + if (jmxServer != null) { + jmxServer.stop + } } } }