http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/Config.java 
b/jstorm-core/src/main/java/backtype/storm/Config.java
index 4273908..0e04b95 100644
--- a/jstorm-core/src/main/java/backtype/storm/Config.java
+++ b/jstorm-core/src/main/java/backtype/storm/Config.java
@@ -17,10 +17,8 @@
  */
 package backtype.storm;
 
-import backtype.storm.ConfigValidation;
 import backtype.storm.serialization.IKryoDecorator;
 import backtype.storm.serialization.IKryoFactory;
-
 import com.esotericsoftware.kryo.Serializer;
 
 import java.util.ArrayList;
@@ -29,28 +27,25 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Topology configs are specified as a plain old map. This class provides a
- * convenient way to create a topology config map by providing setter methods 
for
- * all the configs that can be set. It also makes it easier to do things like 
add
- * serializations.
- *
- * <p>This class also provides constants for all the configurations possible on
- * a Storm cluster and Storm topology. Each constant is paired with a schema
- * that defines the validity criterion of the corresponding field. Default
- * values for these configs can be found in defaults.yaml.</p>
- *
- * <p>Note that you may put other configurations in any of the configs. Storm
- * will ignore anything it doesn't recognize, but your topologies are free to 
make
- * use of them by reading them in the prepare method of Bolts or the open 
method of
- * Spouts.</p>
+ * Topology configs are specified as a plain old map. This class provides a 
convenient way to create a topology config map by providing setter methods for 
all
+ * the configs that can be set. It also makes it easier to do things like add 
serializations.
+ * <p/>
+ * <p>
+ * This class also provides constants for all the configurations possible on a 
Storm cluster and Storm topology. Each constant is paired with a schema that
+ * defines the validity criterion of the corresponding field. Default values 
for these configs can be found in defaults.yaml.
+ * </p>
+ * <p/>
+ * <p>
+ * Note that you may put other configurations in any of the configs. Storm 
will ignore anything it doesn't recognize, but your topologies are free to make 
use
+ * of them by reading them in the prepare method of Bolts or the open method 
of Spouts.
+ * </p>
  */
 public class Config extends HashMap<String, Object> {
-    //DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS
+    // DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS
     private static final long serialVersionUID = -1550278723792864455L;
 
     /**
-     * This is part of a temporary workaround to a ZK bug, it is the 
'scheme:acl' for
-     * the user Nimbus and Supervisors use to authenticate with ZK.
+     * This is part of a temporary workaround to a ZK bug, it is the 
'scheme:acl' for the user Nimbus and Supervisors use to authenticate with ZK.
      */
     public static final String STORM_ZOOKEEPER_SUPERACL = 
"storm.zookeeper.superACL";
     public static final Object STORM_ZOOKEEPER_SUPERACL_SCHEMA = String.class;
@@ -104,7 +99,8 @@ public class Config extends HashMap<String, Object> {
     public static final Object 
STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * If the Netty messaging layer is busy, the Netty client will try to 
batch message as more as possible up to the size of 
STORM_NETTY_MESSAGE_BATCH_SIZE bytes
+     * If the Netty messaging layer is busy, the Netty client will try to 
batch message as more as possible up to the size of 
STORM_NETTY_MESSAGE_BATCH_SIZE
+     * bytes
      */
     public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = 
"storm.messaging.netty.transfer.batch.size";
     public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = 
ConfigValidation.IntegerValidator;
@@ -122,8 +118,8 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = 
Boolean.class;
 
     /**
-     * The delegate for serializing metadata, should be used for serialized 
objects stored in zookeeper and on disk.
-     * This is NOT used for compressing serialized tuples sent between 
topologies.
+     * The delegate for serializing metadata, should be used for serialized 
objects stored in zookeeper and on disk. This is NOT used for compressing 
serialized
+     * tuples sent between topologies.
      */
     public static final String STORM_META_SERIALIZATION_DELEGATE = 
"storm.meta.serialization.delegate";
     public static final Object STORM_META_SERIALIZATION_DELEGATE_SCHEMA = 
String.class;
@@ -141,16 +137,15 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * A directory on the local filesystem used by Storm for any local
-     * filesystem usage it needs. The directory must exist and the Storm 
daemons must
-     * have permission to read/write from this location.
+     * A directory on the local filesystem used by Storm for any local 
filesystem usage it needs. The directory must exist and the Storm daemons must 
have
+     * permission to read/write from this location.
      */
     public static final String STORM_LOCAL_DIR = "storm.local.dir";
     public static final Object STORM_LOCAL_DIR_SCHEMA = String.class;
 
     /**
      * A global task scheduler used to assign topologies's tasks to 
supervisors' wokers.
-     *
+     * <p/>
      * If this is not set, a default system scheduler will be used.
      */
     public static final String STORM_SCHEDULER = "storm.scheduler";
@@ -163,11 +158,10 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
     /**
-     * The hostname the supervisors/workers should report to nimbus. If unset, 
Storm will
-     * get the hostname to report by calling 
<code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
-     *
-     * You should set this config when you dont have a DNS which 
supervisors/workers
-     * can utilize to find each other based on hostname got from calls to
+     * The hostname the supervisors/workers should report to nimbus. If unset, 
Storm will get the hostname to report by calling
+     * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
+     * <p/>
+     * You should set this config when you dont have a DNS which 
supervisors/workers can utilize to find each other based on hostname got from 
calls to
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
      */
     public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
@@ -198,25 +192,22 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = 
String.class;
 
     /**
-     * The serializer class for ListDelegate (tuple payload).
-     * The default serializer will be ListDelegateSerializer
+     * The serializer class for ListDelegate (tuple payload). The default 
serializer will be ListDelegateSerializer
      */
     public static final String TOPOLOGY_TUPLE_SERIALIZER = 
"topology.tuple.serializer";
     public static final Object TOPOLOGY_TUPLE_SERIALIZER_SCHEMA = String.class;
 
     /**
-     * Try to serialize all tuples, even for local transfers.  This should 
only be used
-     * for testing, as a sanity check that all of your tuples are setup 
properly.
+     * Try to serialize all tuples, even for local transfers. This should only 
be used for testing, as a sanity check that all of your tuples are setup
+     * properly.
      */
     public static final String TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE = 
"topology.testing.always.try.serialize";
     public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = 
Boolean.class;
 
     /**
-     * Whether or not to use ZeroMQ for messaging in local mode. If this is set
-     * to false, then Storm will use a pure-Java messaging system. The purpose
-     * of this flag is to make it easy to run Storm in local mode by 
eliminating
-     * the need for native dependencies, which can be difficult to install.
-     *
+     * Whether or not to use ZeroMQ for messaging in local mode. If this is 
set to false, then Storm will use a pure-Java messaging system. The purpose of 
this
+     * flag is to make it easy to run Storm in local mode by eliminating the 
need for native dependencies, which can be difficult to install.
+     * <p/>
      * Defaults to false.
      */
     public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq";
@@ -243,49 +234,45 @@ public class Config extends HashMap<String, Object> {
     /**
      * The number of times to retry a Zookeeper operation.
      */
-    public static final String 
STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times";
+    public static final String STORM_ZOOKEEPER_RETRY_TIMES = 
"storm.zookeeper.retry.times";
     public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The interval between retries of a Zookeeper operation.
      */
-    public static final String 
STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
+    public static final String STORM_ZOOKEEPER_RETRY_INTERVAL = 
"storm.zookeeper.retry.interval";
     public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The ceiling of the interval between retries of a Zookeeper operation.
      */
-    public static final String 
STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
+    public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING = 
"storm.zookeeper.retry.intervalceiling.millis";
     public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The cluster Zookeeper authentication scheme to use, e.g. "digest". 
Defaults to no authentication.
      */
-    public static final String 
STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
+    public static final String STORM_ZOOKEEPER_AUTH_SCHEME = 
"storm.zookeeper.auth.scheme";
     public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = 
String.class;
 
     /**
-     * A string representing the payload for cluster Zookeeper authentication.
-     * It gets serialized using UTF-8 encoding during authentication.
-     * Note that if this is set to something with a secret (as when using
-     * digest authentication) then it should only be set in the
-     * storm-cluster-auth.yaml file.
-     * This file storm-cluster-auth.yaml should then be protected with
-     * appropriate permissions that deny access from workers.
+     * A string representing the payload for cluster Zookeeper authentication. 
It gets serialized using UTF-8 encoding during authentication. Note that if this
+     * is set to something with a secret (as when using digest authentication) 
then it should only be set in the storm-cluster-auth.yaml file. This file
+     * storm-cluster-auth.yaml should then be protected with appropriate 
permissions that deny access from workers.
      */
-    public static final String 
STORM_ZOOKEEPER_AUTH_PAYLOAD="storm.zookeeper.auth.payload";
+    public static final String STORM_ZOOKEEPER_AUTH_PAYLOAD = 
"storm.zookeeper.auth.payload";
     public static final Object STORM_ZOOKEEPER_AUTH_PAYLOAD_SCHEMA = 
String.class;
 
     /**
      * The topology Zookeeper authentication scheme to use, e.g. "digest". 
Defaults to no authentication.
      */
-    public static final String 
STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME="storm.zookeeper.topology.auth.scheme";
+    public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME = 
"storm.zookeeper.topology.auth.scheme";
     public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME_SCHEMA = 
String.class;
 
     /**
      * A string representing the payload for topology Zookeeper 
authentication. It gets serialized using UTF-8 encoding during authentication.
      */
-    public static final String 
STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD="storm.zookeeper.topology.auth.payload";
+    public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD = 
"storm.zookeeper.topology.auth.payload";
     public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD_SCHEMA = 
String.class;
 
     /**
@@ -298,19 +285,19 @@ public class Config extends HashMap<String, Object> {
     /**
      * The number of times to retry a Nimbus operation.
      */
-    public static final String 
STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times";
+    public static final String STORM_NIMBUS_RETRY_TIMES = 
"storm.nimbus.retry.times";
     public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class;
 
     /**
      * The starting interval between exponential backoff retries of a Nimbus 
operation.
      */
-    public static final String 
STORM_NIMBUS_RETRY_INTERVAL="storm.nimbus.retry.interval.millis";
+    public static final String STORM_NIMBUS_RETRY_INTERVAL = 
"storm.nimbus.retry.interval.millis";
     public static final Object STORM_NIMBUS_RETRY_INTERVAL_SCHEMA = 
Number.class;
 
     /**
      * The ceiling of the interval between retries of a client connect to 
Nimbus operation.
      */
-    public static final String 
STORM_NIMBUS_RETRY_INTERVAL_CEILING="storm.nimbus.retry.intervalceiling.millis";
+    public static final String STORM_NIMBUS_RETRY_INTERVAL_CEILING = 
"storm.nimbus.retry.intervalceiling.millis";
     public static final Object STORM_NIMBUS_RETRY_INTERVAL_CEILING_SCHEMA = 
Number.class;
 
     /**
@@ -326,8 +313,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = 
String.class;
 
     /**
-     * Which port the Thrift interface of Nimbus should run on. Clients should
-     * connect to this port to upload jars and submit topologies.
+     * Which port the Thrift interface of Nimbus should run on. Clients should 
connect to this port to upload jars and submit topologies.
      */
     public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
     public static final Object NIMBUS_THRIFT_PORT_SCHEMA = 
ConfigValidation.IntegerValidator;
@@ -339,30 +325,29 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_THRIFT_THREADS_SCHEMA = Number.class;
 
     /**
-     * A list of users that are cluster admins and can run any command.  To 
use this set
-     * nimbus.authorizer to 
backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * A list of users that are cluster admins and can run any command. To use 
this set nimbus.authorizer to
+     * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_ADMINS = "nimbus.admins";
     public static final Object NIMBUS_ADMINS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * A list of users that are the only ones allowed to run user operation on 
storm cluster.
-     * To use this set nimbus.authorizer to 
backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * A list of users that are the only ones allowed to run user operation on 
storm cluster. To use this set nimbus.authorizer to
+     * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_USERS = "nimbus.users";
     public static final Object NIMBUS_USERS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * A list of groups , users belong to these groups are the only ones 
allowed to run user operation on storm cluster.
-     * To use this set nimbus.authorizer to 
backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * A list of groups , users belong to these groups are the only ones 
allowed to run user operation on storm cluster. To use this set 
nimbus.authorizer to
+     * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_GROUPS = "nimbus.groups";
     public static final Object NIMBUS_GROUPS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * A list of users that run the supervisors and should be authorized to 
interact with
-     * nimbus as a supervisor would.  To use this set
-     * nimbus.authorizer to 
backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * A list of users that run the supervisors and should be authorized to 
interact with nimbus as a supervisor would. To use this set nimbus.authorizer to
+     * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String NIMBUS_SUPERVISOR_USERS = 
"nimbus.supervisor.users";
     public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = 
ConfigValidation.StringsValidator;
@@ -374,83 +359,70 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * This parameter is used by the storm-deploy project to configure the
-     * jvm options for the nimbus daemon.
+     * This parameter is used by the storm-deploy project to configure the jvm 
options for the nimbus daemon.
      */
     public static final String NIMBUS_CHILDOPTS = "nimbus.childopts";
     public static final Object NIMBUS_CHILDOPTS_SCHEMA = String.class;
 
-
     /**
-     * How long without heartbeating a task can go before nimbus will consider 
the
-     * task dead and reassign it to another location.
+     * How long without heartbeating a task can go before nimbus will consider 
the task dead and reassign it to another location.
      */
     public static final String NIMBUS_TASK_TIMEOUT_SECS = 
"nimbus.task.timeout.secs";
     public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
-     * How often nimbus should wake up to check heartbeats and do 
reassignments. Note
-     * that if a machine ever goes down Nimbus will immediately wake up and 
take action.
-     * This parameter is for checking for failures when there's no explicit 
event like that
-     * occuring.
+     * How often nimbus should wake up to check heartbeats and do 
reassignments. Note that if a machine ever goes down Nimbus will immediately 
wake up and take
+     * action. This parameter is for checking for failures when there's no 
explicit event like that occuring.
      */
     public static final String NIMBUS_MONITOR_FREQ_SECS = 
"nimbus.monitor.freq.secs";
     public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * How often nimbus should wake the cleanup thread to clean the inbox.
-     * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
      */
     public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = 
"nimbus.cleanup.inbox.freq.secs";
     public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The length of time a jar file lives in the inbox before being deleted 
by the cleanup thread.
-     *
-     * Probably keep this value greater than or equal to 
NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS.
-     * Note that the time it takes to delete an inbox jar file is going to be 
somewhat more than
-     * NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often 
NIMBUS_CLEANUP_FREQ_SECS
-     * is set to).
-     * @see NIMBUS_CLEANUP_FREQ_SECS
+     * <p/>
+     * Probably keep this value greater than or equal to 
NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS. Note that the time it takes to delete 
an inbox jar file is
+     * going to be somewhat more than NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS 
(depending on how often NIMBUS_CLEANUP_FREQ_SECS is set to).
      */
     public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = 
"nimbus.inbox.jar.expiration.secs";
     public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * How long before a supervisor can go without heartbeating before nimbus 
considers it dead
-     * and stops assigning new work to it.
+     * How long before a supervisor can go without heartbeating before nimbus 
considers it dead and stops assigning new work to it.
      */
     public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = 
"nimbus.supervisor.timeout.secs";
     public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * A special timeout used when a task is initially launched. During 
launch, this is the timeout
-     * used until the first heartbeat, overriding nimbus.task.timeout.secs.
-     *
-     * <p>A separate timeout exists for launch because there can be quite a 
bit of overhead
-     * to launching new JVM's and configuring them.</p>
+     * A special timeout used when a task is initially launched. During 
launch, this is the timeout used until the first heartbeat, overriding
+     * nimbus.task.timeout.secs.
+     * <p/>
+     * <p>
+     * A separate timeout exists for launch because there can be quite a bit 
of overhead to launching new JVM's and configuring them.
+     * </p>
      */
     public static final String NIMBUS_TASK_LAUNCH_SECS = 
"nimbus.task.launch.secs";
     public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * Whether or not nimbus should reassign tasks if it detects that a task 
goes down.
-     * Defaults to true, and it's not recommended to change this value.
+     * Whether or not nimbus should reassign tasks if it detects that a task 
goes down. Defaults to true, and it's not recommended to change this value.
      */
     public static final String NIMBUS_REASSIGN = "nimbus.reassign";
     public static final Object NIMBUS_REASSIGN_SCHEMA = Boolean.class;
 
     /**
-     * During upload/download with the master, how long an upload or download 
connection is idle
-     * before nimbus considers it dead and drops the connection.
+     * During upload/download with the master, how long an upload or download 
connection is idle before nimbus considers it dead and drops the connection.
      */
     public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = 
"nimbus.file.copy.expiration.secs";
     public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * A custom class that implements ITopologyValidator that is run whenever a
-     * topology is submitted. Can be used to provide business-specific logic 
for
+     * A custom class that implements ITopologyValidator that is run whenever 
a topology is submitted. Can be used to provide business-specific logic for
      * whether topologies are allowed to run or not.
      */
     public static final String NIMBUS_TOPOLOGY_VALIDATOR = 
"nimbus.topology.validator";
@@ -462,14 +434,12 @@ public class Config extends HashMap<String, Object> {
     public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
     public static final Object NIMBUS_AUTHORIZER_SCHEMA = String.class;
 
-
     /**
      * Impersonation user ACL config entries.
      */
     public static final String NIMBUS_IMPERSONATION_AUTHORIZER = 
"nimbus.impersonation.authorizer";
     public static final Object NIMBUS_IMPERSONATION_AUTHORIZER_SCHEMA = 
String.class;
 
-
     /**
      * Impersonation user ACL config entries.
      */
@@ -489,8 +459,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_CREDENTIAL_RENEWERS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * A list of plugins that nimbus should load during submit topology to 
populate
-     * credentials on user's behalf.
+     * A list of plugins that nimbus should load during submit topology to 
populate credentials on user's behalf.
      */
     public static final String NIMBUS_AUTO_CRED_PLUGINS = 
"nimbus.autocredential.plugins.classes";
     public static final Object NIMBUS_AUTO_CRED_PLUGINS_SCHEMA = 
ConfigValidation.StringsValidator;
@@ -592,8 +561,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object UI_HTTPS_KEYSTORE_PASSWORD_SCHEMA = 
String.class;
 
     /**
-     * Type of keystore used by Storm UI for setting up HTTPS (SSL).
-     * see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more 
details.
+     * Type of keystore used by Storm UI for setting up HTTPS (SSL). see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more 
details.
      */
     public static final String UI_HTTPS_KEYSTORE_TYPE = 
"ui.https.keystore.type";
     public static final Object UI_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
@@ -617,8 +585,8 @@ public class Config extends HashMap<String, Object> {
     public static final Object UI_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = 
String.class;
 
     /**
-     * Type of truststore used by Storm UI for setting up HTTPS (SSL).
-     * see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more 
details.
+     * Type of truststore used by Storm UI for setting up HTTPS (SSL). see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more
+     * details.
      */
     public static final String UI_HTTPS_TRUSTSTORE_TYPE = 
"ui.https.truststore.type";
     public static final Object UI_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class;
@@ -632,7 +600,6 @@ public class Config extends HashMap<String, Object> {
     public static final String UI_HTTPS_NEED_CLIENT_AUTH = 
"ui.https.need.client.auth";
     public static final Object UI_HTTPS_NEED_CLIENT_AUTH_SCHEMA = 
Boolean.class;
 
-
     /**
      * List of DRPC servers so that the DRPCSpout knows who to talk to.
      */
@@ -664,8 +631,8 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_HTTPS_KEYSTORE_PASSWORD_SCHEMA = 
String.class;
 
     /**
-     * Type of keystore used by Storm DRPC for setting up HTTPS (SSL).
-     * see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more 
details.
+     * Type of keystore used by Storm DRPC for setting up HTTPS (SSL). see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more
+     * details.
      */
     public static final String DRPC_HTTPS_KEYSTORE_TYPE = 
"drpc.https.keystore.type";
     public static final Object DRPC_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class;
@@ -689,8 +656,8 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = 
String.class;
 
     /**
-     * Type of truststore used by Storm DRPC for setting up HTTPS (SSL).
-     * see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more 
details.
+     * Type of truststore used by Storm DRPC for setting up HTTPS (SSL). see 
http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more
+     * details.
      */
     public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = 
"drpc.https.truststore.type";
     public static final Object DRPC_HTTPS_TRUSTSTORE_TYPE_SCHEMA = 
String.class;
@@ -724,26 +691,20 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * The Access Control List for the DRPC Authorizer.
-     * @see DRPCSimpleAclAuthorizer
      */
     public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl";
     public static final Object DRPC_AUTHORIZER_ACL_SCHEMA = Map.class;
 
     /**
      * File name of the DRPC Authorizer ACL.
-     * @see DRPCSimpleAclAuthorizer
      */
     public static final String DRPC_AUTHORIZER_ACL_FILENAME = 
"drpc.authorizer.acl.filename";
     public static final Object DRPC_AUTHORIZER_ACL_FILENAME_SCHEMA = 
String.class;
 
     /**
-     * Whether the DRPCSimpleAclAuthorizer should deny requests for operations
-     * involving functions that have no explicit ACL entry. When set to false
-     * (the default) DRPC functions that have no entry in the ACL will be
-     * permitted, which is appropriate for a development environment. When set
-     * to true, explicit ACL entries are required for every DRPC function, and
-     * any request for functions will be denied.
-     * @see DRPCSimpleAclAuthorizer
+     * Whether the DRPCSimpleAclAuthorizer should deny requests for operations 
involving functions that have no explicit ACL entry. When set to false (the
+     * default) DRPC functions that have no entry in the ACL will be 
permitted, which is appropriate for a development environment. When set to 
true, explicit
+     * ACL entries are required for every DRPC function, and any request for 
functions will be denied.
      */
     public static final String DRPC_AUTHORIZER_ACL_STRICT = 
"drpc.authorizer.acl.strict";
     public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = 
Boolean.class;
@@ -785,11 +746,10 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class;
 
     /**
-     * The timeout on DRPC requests within the DRPC server. Defaults to 10 
minutes. Note that requests can also
-     * timeout based on the socket timeout on the DRPC client, and separately 
based on the topology message
-     * timeout for the topology implementing the DRPC function.
+     * The timeout on DRPC requests within the DRPC server. Defaults to 10 
minutes. Note that requests can also timeout based on the socket timeout on the 
DRPC
+     * client, and separately based on the topology message timeout for the 
topology implementing the DRPC function.
      */
-    public static final String DRPC_REQUEST_TIMEOUT_SECS  = 
"drpc.request.timeout.secs";
+    public static final String DRPC_REQUEST_TIMEOUT_SECS = 
"drpc.request.timeout.secs";
     public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
@@ -816,9 +776,8 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_SCHEDULER_META = 
"supervisor.scheduler.meta";
     public static final Object SUPERVISOR_SCHEDULER_META_SCHEMA = Map.class;
     /**
-     * A list of ports that can run workers on this supervisor. Each worker 
uses one port, and
-     * the supervisor will only run one worker per port. Use this 
configuration to tune
-     * how many workers run on each machine.
+     * A list of ports that can run workers on this supervisor. Each worker 
uses one port, and the supervisor will only run one worker per port. Use this
+     * configuration to tune how many workers run on each machine.
      */
     public static final String SUPERVISOR_SLOTS_PORTS = 
"supervisor.slots.ports";
     public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = 
ConfigValidation.IntegersValidator;
@@ -836,8 +795,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_HTTP_FILTER_SCHEMA = String.class;
 
     /**
-     * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
-     * service
+     * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP 
service
      */
     public static final String DRPC_HTTP_FILTER_PARAMS = 
"drpc.http.filter.params";
     public static final Object DRPC_HTTP_FILTER_PARAMS_SCHEMA = Map.class;
@@ -849,15 +807,13 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_EXECUTORS_PER_TOPOLOGY_SCHEMA = 
Number.class;
 
     /**
-     * This parameter is used by the storm-deploy project to configure the
-     * jvm options for the supervisor daemon.
+     * This parameter is used by the storm-deploy project to configure the jvm 
options for the supervisor daemon.
      */
     public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
     public static final Object SUPERVISOR_CHILDOPTS_SCHEMA = String.class;
 
     /**
-     * How long a worker can go without heartbeating before the supervisor 
tries to
-     * restart the worker process.
+     * How long a worker can go without heartbeating before the supervisor 
tries to restart the worker process.
      */
     public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = 
"supervisor.worker.timeout.secs";
     public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
@@ -869,18 +825,15 @@ public class Config extends HashMap<String, Object> {
     public static final Object SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * How long a worker can go without heartbeating during the initial launch 
before
-     * the supervisor tries to restart the worker process. This value override
-     * supervisor.worker.timeout.secs during launch because there is additional
-     * overhead to starting and configuring the JVM on launch.
+     * How long a worker can go without heartbeating during the initial launch 
before the supervisor tries to restart the worker process. This value override
+     * supervisor.worker.timeout.secs during launch because there is 
additional overhead to starting and configuring the JVM on launch.
      */
     public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = 
"supervisor.worker.start.timeout.secs";
     public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * Whether or not the supervisor should launch workers assigned to it. 
Defaults
-     * to true -- and you should probably never change this value. This 
configuration
-     * is used in the Storm unit tests.
+     * Whether or not the supervisor should launch workers assigned to it. 
Defaults to true -- and you should probably never change this value. This
+     * configuration is used in the Storm unit tests.
      */
     public static final String SUPERVISOR_ENABLE = "supervisor.enable";
     public static final Object SUPERVISOR_ENABLE_SCHEMA = Boolean.class;
@@ -891,42 +844,34 @@ public class Config extends HashMap<String, Object> {
     public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = 
"supervisor.heartbeat.frequency.secs";
     public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
-     * How often the supervisor checks the worker heartbeats to see if any of 
them
-     * need to be restarted.
+     * How often the supervisor checks the worker heartbeats to see if any of 
them need to be restarted.
      */
     public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = 
"supervisor.monitor.frequency.secs";
     public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * Should the supervior try to run the worker as the lauching user or not. 
 Defaults to false.
+     * Should the supervior try to run the worker as the lauching user or not. 
Defaults to false.
      */
     public static final String SUPERVISOR_RUN_WORKER_AS_USER = 
"supervisor.run.worker.as.user";
     public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = 
Boolean.class;
 
     /**
-     * Full path to the worker-laucher executable that will be used to lauch 
workers when
-     * SUPERVISOR_RUN_WORKER_AS_USER is set to true.
+     * Full path to the worker-laucher executable that will be used to lauch 
workers when SUPERVISOR_RUN_WORKER_AS_USER is set to true.
      */
     public static final String SUPERVISOR_WORKER_LAUNCHER = 
"supervisor.worker.launcher";
     public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = 
String.class;
 
     /**
-     * The jvm opts provided to workers launched by this supervisor. All 
"%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%"
-     * and "%WORKER-PORT%" substrings are replaced with:
-     * %ID%          -> port (for backward compatibility),
-     * %WORKER-ID%   -> worker-id,
-     * %TOPOLOGY-ID%    -> topology-id,
-     * %WORKER-PORT% -> port.
+     * The jvm opts provided to workers launched by this supervisor. All 
"%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%" and "%WORKER-PORT%" substrings are 
replaced
+     * with: %ID% -> port (for backward compatibility), %WORKER-ID% -> 
worker-id, %TOPOLOGY-ID% -> topology-id, %WORKER-PORT% -> port.
      */
     public static final String WORKER_CHILDOPTS = "worker.childopts";
     public static final Object WORKER_CHILDOPTS_SCHEMA = 
ConfigValidation.StringOrStringListValidator;
 
     /**
-     * The jvm opts provided to workers launched by this supervisor for GC. 
All "%ID%" substrings are replaced
-     * with an identifier for this worker.  Because the JVM complains about 
multiple GC opts the topology
-     * can override this default value by setting topology.worker.gc.childopts.
+     * The jvm opts provided to workers launched by this supervisor for GC. 
All "%ID%" substrings are replaced with an identifier for this worker. Because 
the
+     * JVM complains about multiple GC opts the topology can override this 
default value by setting topology.worker.gc.childopts.
      */
     public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts";
     public static final Object WORKER_GC_CHILDOPTS_SCHEMA = 
ConfigValidation.StringOrStringListValidator;
@@ -949,42 +894,37 @@ public class Config extends HashMap<String, Object> {
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = 
"task.heartbeat.frequency.secs";
     public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
-     * How often a task should sync its connections with other tasks (if a 
task is
-     * reassigned, the other tasks sending messages to it need to refresh 
their connections).
-     * In general though, when a reassignment happens other tasks will be 
notified
-     * almost immediately. This configuration is here just in case that 
notification doesn't
-     * come through.
+     * How often a task should sync its connections with other tasks (if a 
task is reassigned, the other tasks sending messages to it need to refresh their
+     * connections). In general though, when a reassignment happens other 
tasks will be notified almost immediately. This configuration is here just in 
case
+     * that notification doesn't come through.
      */
     public static final String TASK_REFRESH_POLL_SECS = 
"task.refresh.poll.secs";
     public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
      * How often a task should sync credentials, worst case.
      */
     public static final String TASK_CREDENTIALS_POLL_SECS = 
"task.credentials.poll.secs";
     public static final Object TASK_CREDENTIALS_POLL_SECS_SCHEMA = 
Number.class;
 
-
     /**
-     * A list of users that are allowed to interact with the topology.  To use 
this set
-     * nimbus.authorizer to 
backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * A list of users that are allowed to interact with the topology. To use 
this set nimbus.authorizer to
+     * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String TOPOLOGY_USERS = "topology.users";
     public static final Object TOPOLOGY_USERS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * A list of groups that are allowed to interact with the topology.  To 
use this set
-     * nimbus.authorizer to 
backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
+     * A list of groups that are allowed to interact with the topology. To use 
this set nimbus.authorizer to
+     * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
     public static final String TOPOLOGY_GROUPS = "topology.groups";
     public static final Object TOPOLOGY_GROUPS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * True if Storm should timeout messages or not. Defaults to true. This is 
meant to be used
-     * in unit tests to prevent tuples from being accidentally timed out 
during the test.
+     * True if Storm should timeout messages or not. Defaults to true. This is 
meant to be used in unit tests to prevent tuples from being accidentally timed
+     * out during the test.
      */
     public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = 
"topology.enable.message.timeouts";
     public static final Object TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS_SCHEMA = 
Boolean.class;
@@ -996,27 +936,22 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_DEBUG_SCHEMA = Boolean.class;
 
     /**
-     * The serializer for communication between shell components and non-JVM
-     * processes
+     * The serializer for communication between shell components and non-JVM 
processes
      */
     public static final String TOPOLOGY_MULTILANG_SERIALIZER = 
"topology.multilang.serializer";
     public static final Object TOPOLOGY_MULTILANG_SERIALIZER_SCHEMA = 
String.class;
 
     /**
-     * How many processes should be spawned around the cluster to execute this
-     * topology. Each process will execute some number of tasks as threads 
within
-     * them. This parameter should be used in conjunction with the parallelism 
hints
-     * on each component in the topology to tune the performance of a topology.
+     * How many processes should be spawned around the cluster to execute this 
topology. Each process will execute some number of tasks as threads within them.
+     * This parameter should be used in conjunction with the parallelism hints 
on each component in the topology to tune the performance of a topology.
      */
     public static final String TOPOLOGY_WORKERS = "topology.workers";
     public static final Object TOPOLOGY_WORKERS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * How many instances to create for a spout/bolt. A task runs on a thread 
with zero or more
-     * other tasks for the same spout/bolt. The number of tasks for a 
spout/bolt is always
-     * the same throughout the lifetime of a topology, but the number of 
executors (threads) for
-     * a spout/bolt can change over time. This allows a topology to scale to 
more or less resources
-     * without redeploying the topology or violating the constraints of Storm 
(such as a fields grouping
+     * How many instances to create for a spout/bolt. A task runs on a thread 
with zero or more other tasks for the same spout/bolt. The number of tasks for a
+     * spout/bolt is always the same throughout the lifetime of a topology, 
but the number of executors (threads) for a spout/bolt can change over time. 
This
+     * allows a topology to scale to more or less resources without 
redeploying the topology or violating the constraints of Storm (such as a 
fields grouping
      * guaranteeing that the same value goes to the same task).
      */
     public static final String TOPOLOGY_TASKS = "topology.tasks";
@@ -1024,266 +959,242 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * How many executors to spawn for ackers.
-     *
-     * <p>If this is set to 0, then Storm will immediately ack tuples as soon
-     * as they come off the spout, effectively disabling reliability.</p>
+     * <p/>
+     * <p>
+     * If this is set to 0, then Storm will immediately ack tuples as soon as 
they come off the spout, effectively disabling reliability.
+     * </p>
      */
     public static final String TOPOLOGY_ACKER_EXECUTORS = 
"topology.acker.executors";
     public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
-     * The maximum amount of time given to the topology to fully process a 
message
-     * emitted by a spout. If the message is not acked within this time frame, 
Storm
-     * will fail the message on the spout. Some spouts implementations will 
then replay
-     * the message at a later time.
+     * The maximum amount of time given to the topology to fully process a 
message emitted by a spout. If the message is not acked within this time frame, 
Storm
+     * will fail the message on the spout. Some spouts implementations will 
then replay the message at a later time.
      */
     public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = 
"topology.message.timeout.secs";
     public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * A list of serialization registrations for Kryo ( 
http://code.google.com/p/kryo/ ),
-     * the underlying serialization framework for Storm. A serialization can 
either
-     * be the name of a class (in which case Kryo will automatically create a 
serializer for the class
-     * that saves all the object's fields), or an implementation of 
com.esotericsoftware.kryo.Serializer.
-     *
+     * A list of serialization registrations for Kryo ( 
http://code.google.com/p/kryo/ ), the underlying serialization framework for 
Storm. A serialization can
+     * either be the name of a class (in which case Kryo will automatically 
create a serializer for the class that saves all the object's fields), or an
+     * implementation of com.esotericsoftware.kryo.Serializer.
+     * <p/>
      * See Kryo's documentation for more information about writing custom 
serializers.
      */
     public static final String TOPOLOGY_KRYO_REGISTER = 
"topology.kryo.register";
     public static final Object TOPOLOGY_KRYO_REGISTER_SCHEMA = 
ConfigValidation.KryoRegValidator;
 
     /**
-     * A list of classes that customize storm's kryo instance during start-up.
-     * Each listed class name must implement IKryoDecorator. During start-up 
the
-     * listed class is instantiated with 0 arguments, then its 'decorate' 
method
-     * is called with storm's kryo instance as the only argument.
+     * A list of classes that customize storm's kryo instance during start-up. 
Each listed class name must implement IKryoDecorator. During start-up the listed
+     * class is instantiated with 0 arguments, then its 'decorate' method is 
called with storm's kryo instance as the only argument.
      */
     public static final String TOPOLOGY_KRYO_DECORATORS = 
"topology.kryo.decorators";
     public static final Object TOPOLOGY_KRYO_DECORATORS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * Class that specifies how to create a Kryo instance for serialization. 
Storm will then apply
-     * topology.kryo.register and topology.kryo.decorators on top of this. The 
default implementation
-     * implements topology.fall.back.on.java.serialization and turns 
references off.
+     * Class that specifies how to create a Kryo instance for serialization. 
Storm will then apply topology.kryo.register and topology.kryo.decorators on 
top of
+     * this. The default implementation implements 
topology.fall.back.on.java.serialization and turns references off.
      */
     public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory";
     public static final Object TOPOLOGY_KRYO_FACTORY_SCHEMA = String.class;
 
-
     /**
-     * Whether or not Storm should skip the loading of kryo registrations for 
which it
-     * does not know the class or have the serializer implementation. 
Otherwise, the task will
-     * fail to load and will throw an error at runtime. The use case of this 
is if you want to
-     * declare your serializations on the storm.yaml files on the cluster 
rather than every single
-     * time you submit a topology. Different applications may use different 
serializations and so
-     * a single application may not have the code for the other serializers 
used by other apps.
-     * By setting this config to true, Storm will ignore that it doesn't have 
those other serializations
-     * rather than throw an error.
+     * Whether or not Storm should skip the loading of kryo registrations for 
which it does not know the class or have the serializer implementation. 
Otherwise,
+     * the task will fail to load and will throw an error at runtime. The use 
case of this is if you want to declare your serializations on the storm.yaml 
files
+     * on the cluster rather than every single time you submit a topology. 
Different applications may use different serializations and so a single 
application
+     * may not have the code for the other serializers used by other apps. By 
setting this config to true, Storm will ignore that it doesn't have those other
+     * serializations rather than throw an error.
      */
-    public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= 
"topology.skip.missing.kryo.registrations";
+    public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = 
"topology.skip.missing.kryo.registrations";
     public static final Object TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS_SCHEMA 
= Boolean.class;
 
     /*
-     * A list of classes implementing IMetricsConsumer (See storm.yaml.example 
for exact config format).
-     * Each listed class will be routed all the metrics data generated by the 
storm metrics API.
-     * Each listed class maps 1:1 to a system bolt named 
__metrics_ClassName#N, and it's parallelism is configurable.
+     * A list of classes implementing IMetricsConsumer (See storm.yaml.example 
for exact config format). Each listed class will be routed all the metrics data
+     * generated by the storm metrics API. Each listed class maps 1:1 to a 
system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
      */
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = 
"topology.metrics.consumer.register";
     public static final Object TOPOLOGY_METRICS_CONSUMER_REGISTER_SCHEMA = 
ConfigValidation.MapsValidator;
 
-
     /**
-     * The maximum parallelism allowed for a component in this topology. This 
configuration is
-     * typically used in testing to limit the number of threads spawned in 
local mode.
+     * The maximum parallelism allowed for a component in this topology. This 
configuration is typically used in testing to limit the number of threads 
spawned
+     * in local mode.
      */
-    public static final String 
TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
+    public static final String TOPOLOGY_MAX_TASK_PARALLELISM = 
"topology.max.task.parallelism";
     public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
-     * The maximum number of tuples that can be pending on a spout task at any 
given time.
-     * This config applies to individual tasks, not to spouts or topologies as 
a whole.
-     *
-     * A pending tuple is one that has been emitted from a spout but has not 
been acked or failed yet.
-     * Note that this config parameter has no effect for unreliable spouts 
that don't tag
-     * their tuples with a message id.
+     * The maximum number of tuples that can be pending on a spout task at any 
given time. This config applies to individual tasks, not to spouts or topologies
+     * as a whole.
+     * <p/>
+     * A pending tuple is one that has been emitted from a spout but has not 
been acked or failed yet. Note that this config parameter has no effect for
+     * unreliable spouts that don't tag their tuples with a message id.
      */
-    public static final String 
TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
+    public static final String TOPOLOGY_MAX_SPOUT_PENDING = 
"topology.max.spout.pending";
     public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * A class that implements a strategy for what to do when a spout needs to 
wait. Waiting is
-     * triggered in one of two conditions:
-     *
-     * 1. nextTuple emits no tuples
-     * 2. The spout has hit maxSpoutPending and can't emit any more tuples
+     * A class that implements a strategy for what to do when a spout needs to 
wait. Waiting is triggered in one of two conditions:
+     * <p/>
+     * 1. nextTuple emits no tuples 2. The spout has hit maxSpoutPending and 
can't emit any more tuples
      */
-    public static final String 
TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
+    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY = 
"topology.spout.wait.strategy";
     public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = 
String.class;
 
     /**
+     * Configure the wait timeout used for timeout blocking wait strategy.
+     */
+    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT = 
"topology.disruptor.wait.timeout";
+    public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_SCHEMA = 
Number.class;
+
+    /**
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
-    public static final String 
TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
+    public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS = 
"topology.sleep.spout.wait.strategy.time.ms";
     public static final Object 
TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * The maximum amount of time a component gives a source of state to 
synchronize before it requests
-     * synchronization again.
+     * The maximum amount of time a component gives a source of state to 
synchronize before it requests synchronization again.
      */
-    public static final String 
TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
+    public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS = 
"topology.state.synchronization.timeout.secs";
     public static final Object 
TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The percentage of tuples to sample to produce stats for a task.
      */
-    public static final String 
TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
+    public static final String TOPOLOGY_STATS_SAMPLE_RATE = 
"topology.stats.sample.rate";
     public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = 
ConfigValidation.DoubleValidator;
 
     /**
      * The time period that builtin metrics data in bucketed into.
      */
-    public static final String 
TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
+    public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS = 
"topology.builtin.metrics.bucket.size.secs";
     public static final Object 
TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not to use Java serialization in a topology.
      */
-    public static final String 
TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
+    public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = 
"topology.fall.back.on.java.serialization";
     public static final Object TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION_SCHEMA 
= Boolean.class;
 
     /**
+     * Whether or not need to be registered for kryo serialization in a 
topology.
+     */
+    public static final String TOPOLOGY_KRYO_REGISTER_REQUIRED =
+            "topology.kryo.register.required";
+    public static final Object TOPOLOGY_KRYO_REGISTER_REQUIRED_SCHEMA =
+            Boolean.class;
+
+    /**
      * Topology-specific options for the worker child process. This is used in 
addition to WORKER_CHILDOPTS.
      */
-    public static final String 
TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts";
+    public static final String TOPOLOGY_WORKER_CHILDOPTS = 
"topology.worker.childopts";
     public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = 
ConfigValidation.StringOrStringListValidator;
 
     /**
      * Topology-specific options GC for the worker child process. This 
overrides WORKER_GC_CHILDOPTS.
      */
-    public static final String 
TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts";
+    public static final String TOPOLOGY_WORKER_GC_CHILDOPTS = 
"topology.worker.gc.childopts";
     public static final Object TOPOLOGY_WORKER_GC_CHILDOPTS_SCHEMA = 
ConfigValidation.StringOrStringListValidator;
 
     /**
      * Topology-specific classpath for the worker child process. This is 
combined to the usual classpath.
      */
-    public static final String TOPOLOGY_CLASSPATH="topology.classpath";
+    public static final String TOPOLOGY_CLASSPATH = "topology.classpath";
     public static final Object TOPOLOGY_CLASSPATH_SCHEMA = 
ConfigValidation.StringOrStringListValidator;
 
     /**
-     * Topology-specific environment variables for the worker child process.
-     * This is added to the existing environment (that of the supervisor)
+     * Topology-specific environment variables for the worker child process. 
This is added to the existing environment (that of the supervisor)
      */
-     public static final String TOPOLOGY_ENVIRONMENT="topology.environment";
-     public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class;
+    public static final String TOPOLOGY_ENVIRONMENT = "topology.environment";
+    public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class;
 
     /*
-     * Topology-specific option to disable/enable bolt's outgoing overflow 
buffer.
-     * Enabling this option ensures that the bolt can always clear the 
incoming messages,
-     * preventing live-lock for the topology with cyclic flow.
-     * The overflow buffer can fill degrading the performance gradually,
-     * eventually running out of memory.
+     * Topology-specific option to disable/enable bolt's outgoing overflow 
buffer. Enabling this option ensures that the bolt can always clear the incoming
+     * messages, preventing live-lock for the topology with cyclic flow. The 
overflow buffer can fill degrading the performance gradually, eventually running
+     * out of memory.
      */
-    public static final String 
TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable";
+    public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE 
= "topology.bolts.outgoing.overflow.buffer.enable";
     public static final Object 
TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE_SCHEMA = Boolean.class;
 
     /**
-     * This config is available for TransactionalSpouts, and contains the id ( 
a String) for
-     * the transactional topology. This id is used to store the state of the 
transactional
-     * topology in Zookeeper.
+     * This config is available for TransactionalSpouts, and contains the id ( 
a String) for the transactional topology. This id is used to store the state of
+     * the transactional topology in Zookeeper.
      */
-    public static final String 
TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id";
+    public static final String TOPOLOGY_TRANSACTIONAL_ID = 
"topology.transactional.id";
     public static final Object TOPOLOGY_TRANSACTIONAL_ID_SCHEMA = String.class;
 
     /**
-     * A list of task hooks that are automatically added to every spout and 
bolt in the topology. An example
-     * of when you'd do this is to add a hook that integrates with your 
internal
-     * monitoring system. These hooks are instantiated using the zero-arg 
constructor.
+     * A list of task hooks that are automatically added to every spout and 
bolt in the topology. An example of when you'd do this is to add a hook that
+     * integrates with your internal monitoring system. These hooks are 
instantiated using the zero-arg constructor.
      */
-    public static final String 
TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
+    public static final String TOPOLOGY_AUTO_TASK_HOOKS = 
"topology.auto.task.hooks";
     public static final Object TOPOLOGY_AUTO_TASK_HOOKS_SCHEMA = 
ConfigValidation.StringsValidator;
 
-
     /**
      * The size of the Disruptor receive queue for each executor. Must be a 
power of 2.
      */
-    public static final String 
TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size";
+    public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE = 
"topology.executor.receive.buffer.size";
     public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = 
ConfigValidation.PowerOf2Validator;
 
     /**
-     * The maximum number of messages to batch from the thread receiving off 
the network to the
-     * executor queues. Must be a power of 2.
+     * The maximum number of messages to batch from the thread receiving off 
the network to the executor queues. Must be a power of 2.
      */
-    public static final String 
TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
+    public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE = 
"topology.receiver.buffer.size";
     public static final Object TOPOLOGY_RECEIVER_BUFFER_SIZE_SCHEMA = 
ConfigValidation.PowerOf2Validator;
 
     /**
      * The size of the Disruptor send queue for each executor. Must be a power 
of 2.
      */
-    public static final String 
TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size";
+    public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE = 
"topology.executor.send.buffer.size";
     public static final Object TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE_SCHEMA = 
ConfigValidation.PowerOf2Validator;
 
     /**
      * The size of the Disruptor transfer queue for each worker.
      */
-    public static final String 
TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
+    public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE = 
"topology.transfer.buffer.size";
     public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-   /**
-    * How often a tick tuple from the "__system" component and "__tick" stream 
should be sent
-    * to tasks. Meant to be used as a component-specific configuration.
-    */
-    public static final String 
TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
+    /**
+     * How often a tick tuple from the "__system" component and "__tick" 
stream should be sent to tasks. Meant to be used as a component-specific 
configuration.
+     */
+    public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = 
"topology.tick.tuple.freq.secs";
     public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
-   /**
-    * Configure the wait strategy used for internal queuing. Can be used to 
tradeoff latency
-    * vs. throughput
-    */
-    public static final String 
TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";
+    /**
+     * Configure the wait strategy used for internal queuing. Can be used to 
tradeoff latency vs. throughput
+     */
+    public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY = 
"topology.disruptor.wait.strategy";
     public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = 
String.class;
 
     /**
-     * Configure the wait timeout used for timeout blocking wait strategy.
+     * The size of the shared thread pool for worker tasks to make use of. The 
thread pool can be accessed via the TopologyContext.
      */
-    public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT =
-            "topology.disruptor.wait.timeout";
-    public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_SCHEMA =
-            Number.class;
-    
-    /*
-    * The size of the shared thread pool for worker tasks to make use of. The 
thread pool can be accessed
-    * via the TopologyContext.
-    */
-    public static final String 
TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
+    public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE = 
"topology.worker.shared.thread.pool.size";
     public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA 
= ConfigValidation.IntegerValidator;
 
     /**
-     * The interval in seconds to use for determining whether to throttle 
error reported to Zookeeper. For example,
-     * an interval of 10 seconds with topology.max.error.report.per.interval 
set to 5 will only allow 5 errors to be
-     * reported to Zookeeper per task for every 10 second interval of time.
+     * The interval in seconds to use for determining whether to throttle 
error reported to Zookeeper. For example, an interval of 10 seconds with
+     * topology.max.error.report.per.interval set to 5 will only allow 5 
errors to be reported to Zookeeper per task for every 10 second interval of 
time.
      */
-    public static final String 
TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";
+    public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS = 
"topology.error.throttle.interval.secs";
     public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
      */
-    public static final String 
TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
+    public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL = 
"topology.max.error.report.per.interval";
     public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = 
ConfigValidation.IntegerValidator;
 
-
     /**
      * How often a batch can be emitted in a Trident topology.
      */
-    public static final String 
TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
+    public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS = 
"topology.trident.batch.emit.interval.millis";
     public static final Object 
TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * Name of the topology. This config is automatically set by Storm when 
the topology is submitted.
      */
-    public final static String TOPOLOGY_NAME="topology.name";
+    public final static String TOPOLOGY_NAME = "topology.name";
     public static final Object TOPOLOGY_NAME_SCHEMA = String.class;
 
     /**
@@ -1313,33 +1224,31 @@ public class Config extends HashMap<String, Object> {
     /**
      * Max pending tuples in one ShellBolt
      */
-    public static final String 
TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
+    public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING = 
"topology.shellbolt.max.pending";
     public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The root directory in ZooKeeper for metadata about TransactionalSpouts.
      */
-    public static final String 
TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root";
+    public static final String TRANSACTIONAL_ZOOKEEPER_ROOT = 
"transactional.zookeeper.root";
     public static final Object TRANSACTIONAL_ZOOKEEPER_ROOT_SCHEMA = 
String.class;
 
     /**
-     * The list of zookeeper servers in which to keep the transactional state. 
If null (which is default),
-     * will use storm.zookeeper.servers
+     * The list of zookeeper servers in which to keep the transactional state. 
If null (which is default), will use storm.zookeeper.servers
      */
-    public static final String 
TRANSACTIONAL_ZOOKEEPER_SERVERS="transactional.zookeeper.servers";
+    public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS = 
"transactional.zookeeper.servers";
     public static final Object TRANSACTIONAL_ZOOKEEPER_SERVERS_SCHEMA = 
ConfigValidation.StringsValidator;
 
     /**
-     * The port to use to connect to the transactional zookeeper servers. If 
null (which is default),
-     * will use storm.zookeeper.port
+     * The port to use to connect to the transactional zookeeper servers. If 
null (which is default), will use storm.zookeeper.port
      */
-    public static final String 
TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
+    public static final String TRANSACTIONAL_ZOOKEEPER_PORT = 
"transactional.zookeeper.port";
     public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
      * The user as which the nimbus client should be acquired to perform the 
operation.
      */
-    public static final String STORM_DO_AS_USER="storm.doAsUser";
+    public static final String STORM_DO_AS_USER = "storm.doAsUser";
     public static final Object STORM_DO_AS_USER_SCHEMA = String.class;
 
     /**
@@ -1349,58 +1258,54 @@ public class Config extends HashMap<String, Object> {
     public static final Object ZMQ_THREADS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * How long a connection should retry sending messages to a target host 
when
-     * the connection is closed. This is an advanced configuration and can 
almost
+     * How long a connection should retry sending messages to a target host 
when the connection is closed. This is an advanced configuration and can almost
      * certainly be ignored.
      */
     public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
     public static final Object ZMQ_LINGER_MILLIS_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * The high water for the ZeroMQ push sockets used for networking. Use 
this config to prevent buffer explosion
-     * on the networking layer.
+     * The high water for the ZeroMQ push sockets used for networking. Use 
this config to prevent buffer explosion on the networking layer.
      */
     public static final String ZMQ_HWM = "zmq.hwm";
     public static final Object ZMQ_HWM_SCHEMA = 
ConfigValidation.IntegerValidator;
 
     /**
-     * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and 
Workers)
-     * for the java.library.path value. java.library.path tells the JVM where
-     * to look for native libraries. It is necessary to set this config 
correctly since
-     * Storm uses the ZeroMQ and JZMQ native libs.
+     * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and 
Workers) for the java.library.path value. java.library.path tells the JVM where 
to
+     * look for native libraries. It is necessary to set this config correctly 
since Storm uses the ZeroMQ and JZMQ native libs.
      */
     public static final String JAVA_LIBRARY_PATH = "java.library.path";
     public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
 
     /**
-     * The path to use as the zookeeper dir when running a zookeeper server via
-     * "storm dev-zookeeper". This zookeeper instance is only intended for 
development;
+     * The path to use as the zookeeper dir when running a zookeeper server 
via "storm dev-zookeeper". This zookeeper instance is only intended for 
development;
      * it is not a production grade zookeeper setup.
      */
     public static final String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path";
     public static final Object DEV_ZOOKEEPER_PATH_SCHEMA = String.class;
 
     /**
-     * A map from topology name to the number of machines that should be 
dedicated for that topology. Set storm.scheduler
-     * to backtype.storm.scheduler.IsolationScheduler to make use of the 
isolation scheduler.
+     * A map from topology name to the number of machines that should be 
dedicated for that topology. Set storm.scheduler to
+     * backtype.storm.scheduler.IsolationScheduler to make use of the 
isolation scheduler.
      */
     public static final String ISOLATION_SCHEDULER_MACHINES = 
"isolation.scheduler.machines";
     public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = 
ConfigValidation.MapOfStringToNumberValidator;
 
     /**
-     * A map from the user name to the number of machines that should that 
user is allowed to use. Set storm.scheduler
-     * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+     * A map from the user name to the number of machines that should that 
user is allowed to use. Set storm.scheduler to
+     * backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
     public static final String MULTITENANT_SCHEDULER_USER_POOLS = 
"multitenant.scheduler.user.pools";
     public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = 
ConfigValidation.MapOfStringToNumberValidator;
 
     /**
-     * The number of machines that should be used by this topology to isolate 
it from all others. Set storm.scheduler
-     * to backtype.storm.scheduler.multitenant.MultitenantScheduler
+     * The number of machines that should be used by this topology to isolate 
it from all others. Set storm.scheduler to
+     * backtype.storm.scheduler.multitenant.MultitenantScheduler
      */
     public static final String TOPOLOGY_ISOLATED_MACHINES = 
"topology.isolate.machines";
     public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = 
Number.class;
 
+
     public static void setClasspath(Map conf, String cp) {
         conf.put(Config.TOPOLOGY_CLASSPATH, cp);
     }
@@ -1473,14 +1378,16 @@ public class Config extends HashMap<String, Object> {
         m.put("parallelism.hint", parallelismHint);
         m.put("argument", argument);
 
-        List l = (List)conf.get(TOPOLOGY_METRICS_CONSUMER_REGISTER);
-        if (l == null) { l = new ArrayList(); }
+        List l = (List) conf.get(TOPOLOGY_METRICS_CONSUMER_REGISTER);
+        if (l == null) {
+            l = new ArrayList();
+        }
         l.add(m);
         conf.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l);
     }
 
     public void registerMetricsConsumer(Class klass, Object argument, long 
parallelismHint) {
-       registerMetricsConsumer(this, klass, argument, parallelismHint);
+        registerMetricsConsumer(this, klass, argument, parallelismHint);
     }
 
     public static void registerMetricsConsumer(Map conf, Class klass, long 
parallelismHint) {
@@ -1520,7 +1427,7 @@ public class Config extends HashMap<String, Object> {
     }
 
     public void setSkipMissingKryoRegistrations(boolean skip) {
-       setSkipMissingKryoRegistrations(this, skip);
+        setSkipMissingKryoRegistrations(this, skip);
     }
 
     public static void setMaxTaskParallelism(Map conf, int max) {
@@ -1557,7 +1464,7 @@ public class Config extends HashMap<String, Object> {
 
     private static List getRegisteredSerializations(Map conf) {
         List ret;
-        if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
+        if (!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
             ret = new ArrayList();
         } else {
             ret = new ArrayList((List) 
conf.get(Config.TOPOLOGY_KRYO_REGISTER));
@@ -1568,7 +1475,7 @@ public class Config extends HashMap<String, Object> {
 
     private static List getRegisteredDecorators(Map conf) {
         List ret;
-        if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
+        if (!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
             ret = new ArrayList();
         } else {
             ret = new ArrayList((List) 
conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
@@ -1576,4 +1483,12 @@ public class Config extends HashMap<String, Object> {
         conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
         return ret;
     }
+
+    public static void setKryoRegisterRequired(Map conf, boolean required) {
+        conf.put(Config.TOPOLOGY_KRYO_REGISTER_REQUIRED, required);
+    }
+
+    public void setKryoRegisterRequired(boolean fallback) {
+        setKryoRegisterRequired(this, fallback);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java 
b/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java
index 24991d7..9fe2f69 100755
--- a/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java
+++ b/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package backtype.storm;
+
 import java.util.Map;
 
 import java.util.Map;
@@ -31,13 +32,14 @@ public class ConfigValidation {
     public static interface FieldValidator {
         /**
          * Validates the given field.
+         * 
          * @param name the name of the field.
          * @param field The field to be validated.
          * @throws IllegalArgumentException if the field fails validation.
          */
         public void validateField(String name, Object field) throws 
IllegalArgumentException;
     }
-    
+
     /**
      * Declares a method for validating configuration values that is nestable.
      */
@@ -46,9 +48,10 @@ public class ConfigValidation {
         public void validateField(String name, Object field) throws 
IllegalArgumentException {
             validateField(null, name, field);
         }
-        
+
         /**
          * Validates the given field.
+         * 
          * @param pd describes the parent wrapping this validator.
          * @param name the name of the field.
          * @param field The field to be validated.
@@ -59,6 +62,7 @@ public class ConfigValidation {
 
     /**
      * Returns a new NestableFieldValidator for a given class.
+     * 
      * @param cls the Class the field should be a type of
      * @param nullAllowed whether or not a value of null is valid
      * @return a NestableFieldValidator for that class
@@ -66,99 +70,93 @@ public class ConfigValidation {
     public static NestableFieldValidator fv(final Class cls, final boolean 
nullAllowed) {
         return new NestableFieldValidator() {
             @Override
-            public void validateField(String pd, String name, Object field)
-                    throws IllegalArgumentException {
+            public void validateField(String pd, String name, Object field) 
throws IllegalArgumentException {
                 if (nullAllowed && field == null) {
                     return;
                 }
-                if (! cls.isInstance(field)) {
-                    throw new IllegalArgumentException(
-                        pd + name + " must be a " + cls.getName() + ". 
("+field+")");
+                if (!cls.isInstance(field)) {
+                    throw new IllegalArgumentException(pd + name + " must be a 
" + cls.getName() + ". (" + field + ")");
                 }
             }
         };
     }
-    
+
     /**
      * Returns a new NestableFieldValidator for a List of the given Class.
+     * 
      * @param cls the Class of elements composing the list
      * @param nullAllowed whether or not a value of null is valid
      * @return a NestableFieldValidator for a list of the given class
      */
     public static NestableFieldValidator listFv(Class cls, boolean 
nullAllowed) {
-      return listFv(fv(cls, false), nullAllowed);
+        return listFv(fv(cls, false), nullAllowed);
     }
-    
+
     /**
      * Returns a new NestableFieldValidator for a List where each item is 
validated by validator.
+     * 
      * @param validator used to validate each item in the list
      * @param nullAllowed whether or not a value of null is valid
      * @return a NestableFieldValidator for a list with each item validated by 
a different validator.
      */
-    public static NestableFieldValidator listFv(final NestableFieldValidator 
validator, 
-            final boolean nullAllowed) {
+    public static NestableFieldValidator listFv(final NestableFieldValidator 
validator, final boolean nullAllowed) {
         return new NestableFieldValidator() {
             @Override
-            public void validateField(String pd, String name, Object field)
-                    throws IllegalArgumentException {
+            public void validateField(String pd, String name, Object field) 
throws IllegalArgumentException {
                 if (nullAllowed && field == null) {
                     return;
                 }
                 if (field instanceof Iterable) {
-                    for (Object e : (Iterable)field) {
+                    for (Object e : (Iterable) field) {
                         validator.validateField(pd + "Each element of the list 
", name, e);
                     }
                     return;
                 }
-                throw new IllegalArgumentException(
-                        "Field " + name + " must be an Iterable but was " +
-                        ((field == null) ? "null" :  ("a " + 
field.getClass())));
+                throw new IllegalArgumentException("Field " + name + " must be 
an Iterable but was " + ((field == null) ? "null" : ("a " + field.getClass())));
             }
         };
     }
 
     /**
      * Returns a new NestableFieldValidator for a Map of key to val.
+     * 
      * @param key the Class of keys in the map
      * @param val the Class of values in the map
      * @param nullAllowed whether or not a value of null is valid
      * @return a NestableFieldValidator for a Map of key to val
      */
-    public static NestableFieldValidator mapFv(Class key, Class val, 
-            boolean nullAllowed) {
+    public static NestableFieldValidator mapFv(Class key, Class val, boolean 
nullAllowed) {
         return mapFv(fv(key, false), fv(val, false), nullAllowed);
     }
- 
+
     /**
      * Returns a new NestableFieldValidator for a Map.
+     * 
      * @param key a validator for the keys in the map
      * @param val a validator for the values in the map
      * @param nullAllowed whether or not a value of null is valid
      * @return a NestableFieldValidator for a Map
-     */   
-    public static NestableFieldValidator mapFv(final NestableFieldValidator 
key, 
-            final NestableFieldValidator val, final boolean nullAllowed) {
+     */
+    public static NestableFieldValidator mapFv(final NestableFieldValidator 
key, final NestableFieldValidator val, final boolean nullAllowed) {
         return new NestableFieldValidator() {
             @SuppressWarnings("unchecked")
             @Override
-            public void validateField(String pd, String name, Object field)
-                    throws IllegalArgumentException {
+            public void validateField(String pd, String name, Object field) 
throws IllegalArgumentException {
                 if (nullAllowed && field == null) {
                     return;
                 }
                 if (field instanceof Map) {
-                    for (Map.Entry<Object, Object> entry: ((Map<Object, 
Object>)field).entrySet()) {
-                      key.validateField("Each key of the map ", name, 
entry.getKey());
-                      val.validateField("Each value in the map ", name, 
entry.getValue());
+                    for (Map.Entry<Object, Object> entry : ((Map<Object, 
Object>) field).entrySet()) {
+                        key.validateField("Each key of the map ", name, 
entry.getKey());
+                        val.validateField("Each value in the map ", name, 
entry.getValue());
                     }
                     return;
                 }
-                throw new IllegalArgumentException(
-                        "Field " + name + " must be a Map");
+                throw new IllegalArgumentException("Field " + name + " must be 
a Map");
             }
         };
     }
-    
+
     /**
      * Validates a list of Numbers.
      */
@@ -175,8 +173,7 @@ public class ConfigValidation {
     public static Object MapOfStringToNumberValidator = mapFv(String.class, 
Number.class, true);
 
     /**
-     * Validates a map of Strings to a map of Strings to a list.
-     * {str -> {str -> [str,str]}
+     * Validates a map of Strings to a map of Strings to a list. {str -> {str 
-> [str,str]}
      */
     public static Object MapOfStringToMapValidator = mapFv(fv(String.class, 
false), mapFv(fv(String.class, false), listFv(String.class, false), false), 
true);
 
@@ -196,8 +193,7 @@ public class ConfigValidation {
                 return;
             }
             final long i;
-            if (o instanceof Number &&
-                    (i = ((Number)o).longValue()) == 
((Number)o).doubleValue()) {
+            if (o instanceof Number && (i = ((Number) o).longValue()) == 
((Number) o).doubleValue()) {
                 if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
                     return;
                 }
@@ -212,22 +208,19 @@ public class ConfigValidation {
      */
     public static Object IntegersValidator = new FieldValidator() {
         @Override
-        public void validateField(String name, Object field)
-                throws IllegalArgumentException {
+        public void validateField(String name, Object field) throws 
IllegalArgumentException {
             if (field == null) {
                 // A null value is acceptable.
                 return;
             }
             if (field instanceof Iterable) {
-                for (Object o : (Iterable)field) {
+                for (Object o : (Iterable) field) {
                     final long i;
-                    if (o instanceof Number &&
-                            ((i = ((Number)o).longValue()) == 
((Number)o).doubleValue()) &&
-                            (i <= Integer.MAX_VALUE && i >= 
Integer.MIN_VALUE)) {
+                    if (o instanceof Number && ((i = ((Number) o).longValue()) 
== ((Number) o).doubleValue())
+                            && (i <= Integer.MAX_VALUE && i >= 
Integer.MIN_VALUE)) {
                         // pass the test
                     } else {
-                        throw new IllegalArgumentException(
-                                "Each element of the list " + name + " must be 
an Integer within type range.");
+                        throw new IllegalArgumentException("Each element of 
the list " + name + " must be an Integer within type range.");
                     }
                 }
                 return;
@@ -266,11 +259,9 @@ public class ConfigValidation {
                 return;
             }
             final long i;
-            if (o instanceof Number &&
-                    (i = ((Number)o).longValue()) == ((Number)o).doubleValue())
-            {
+            if (o instanceof Number && (i = ((Number) o).longValue()) == 
((Number) o).doubleValue()) {
                 // Test whether the integer is a power of 2.
-                if (i > 0 && (i & (i-1)) == 0) {
+                if (i > 0 && (i & (i - 1)) == 0) {
                     return;
                 }
             }
@@ -289,9 +280,7 @@ public class ConfigValidation {
                 return;
             }
             final long i;
-            if (o instanceof Number &&
-                    (i = ((Number)o).longValue()) == ((Number)o).doubleValue())
-            {
+            if (o instanceof Number && (i = ((Number) o).longValue()) == 
((Number) o).doubleValue()) {
                 if (i > 0) {
                     return;
                 }
@@ -311,24 +300,20 @@ public class ConfigValidation {
                 return;
             }
             if (o instanceof Iterable) {
-                for (Object e : (Iterable)o) {
+                for (Object e : (Iterable) o) {
                     if (e instanceof Map) {
-                        for (Map.Entry<Object,Object> entry: 
((Map<Object,Object>)e).entrySet()) {
-                            if (!(entry.getKey() instanceof String) ||
-                                !(entry.getValue() instanceof String)) {
-                                throw new IllegalArgumentException(
-                                    "Each element of the list " + name + " 
must be a String or a Map of Strings");
+                        for (Map.Entry<Object, Object> entry : ((Map<Object, 
Object>) e).entrySet()) {
+                            if (!(entry.getKey() instanceof String) || 
!(entry.getValue() instanceof String)) {
+                                throw new IllegalArgumentException("Each 
element of the list " + name + " must be a String or a Map of Strings");
                             }
                         }
                     } else if (!(e instanceof String)) {
-                        throw new IllegalArgumentException(
-                                "Each element of the list " + name + " must be 
a String or a Map of Strings");
+                        throw new IllegalArgumentException("Each element of 
the list " + name + " must be a String or a Map of Strings");
                     }
                 }
                 return;
             }
-            throw new IllegalArgumentException(
-                    "Field " + name + " must be an Iterable containing only 
Strings or Maps of Strings");
+            throw new IllegalArgumentException("Field " + name + " must be an 
Iterable containing only Strings or Maps of Strings");
         }
     };
 

Reply via email to