http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/Config.java b/jstorm-core/src/main/java/backtype/storm/Config.java index 4273908..0e04b95 100644 --- a/jstorm-core/src/main/java/backtype/storm/Config.java +++ b/jstorm-core/src/main/java/backtype/storm/Config.java @@ -17,10 +17,8 @@ */ package backtype.storm; -import backtype.storm.ConfigValidation; import backtype.storm.serialization.IKryoDecorator; import backtype.storm.serialization.IKryoFactory; - import com.esotericsoftware.kryo.Serializer; import java.util.ArrayList; @@ -29,28 +27,25 @@ import java.util.List; import java.util.Map; /** - * Topology configs are specified as a plain old map. This class provides a - * convenient way to create a topology config map by providing setter methods for - * all the configs that can be set. It also makes it easier to do things like add - * serializations. - * - * <p>This class also provides constants for all the configurations possible on - * a Storm cluster and Storm topology. Each constant is paired with a schema - * that defines the validity criterion of the corresponding field. Default - * values for these configs can be found in defaults.yaml.</p> - * - * <p>Note that you may put other configurations in any of the configs. Storm - * will ignore anything it doesn't recognize, but your topologies are free to make - * use of them by reading them in the prepare method of Bolts or the open method of - * Spouts.</p> + * Topology configs are specified as a plain old map. This class provides a convenient way to create a topology config map by providing setter methods for all + * the configs that can be set. It also makes it easier to do things like add serializations. + * <p/> + * <p> + * This class also provides constants for all the configurations possible on a Storm cluster and Storm topology. Each constant is paired with a schema that + * defines the validity criterion of the corresponding field. Default values for these configs can be found in defaults.yaml. + * </p> + * <p/> + * <p> + * Note that you may put other configurations in any of the configs. Storm will ignore anything it doesn't recognize, but your topologies are free to make use + * of them by reading them in the prepare method of Bolts or the open method of Spouts. + * </p> */ public class Config extends HashMap<String, Object> { - //DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS + // DO NOT CHANGE UNLESS WE ADD IN STATE NOT STORED IN THE PARENT CLASS private static final long serialVersionUID = -1550278723792864455L; /** - * This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for - * the user Nimbus and Supervisors use to authenticate with ZK. + * This is part of a temporary workaround to a ZK bug, it is the 'scheme:acl' for the user Nimbus and Supervisors use to authenticate with ZK. */ public static final String STORM_ZOOKEEPER_SUPERACL = "storm.zookeeper.superACL"; public static final Object STORM_ZOOKEEPER_SUPERACL_SCHEMA = String.class; @@ -104,7 +99,8 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator; /** - * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes + * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE + * bytes */ public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size"; public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = ConfigValidation.IntegerValidator; @@ -122,8 +118,8 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_MESSAGING_NETTY_AUTHENTICATION_SCHEMA = Boolean.class; /** - * The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk. - * This is NOT used for compressing serialized tuples sent between topologies. + * The delegate for serializing metadata, should be used for serialized objects stored in zookeeper and on disk. This is NOT used for compressing serialized + * tuples sent between topologies. */ public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate"; public static final Object STORM_META_SERIALIZATION_DELEGATE_SCHEMA = String.class; @@ -141,16 +137,15 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator; /** - * A directory on the local filesystem used by Storm for any local - * filesystem usage it needs. The directory must exist and the Storm daemons must - * have permission to read/write from this location. + * A directory on the local filesystem used by Storm for any local filesystem usage it needs. The directory must exist and the Storm daemons must have + * permission to read/write from this location. */ public static final String STORM_LOCAL_DIR = "storm.local.dir"; public static final Object STORM_LOCAL_DIR_SCHEMA = String.class; /** * A global task scheduler used to assign topologies's tasks to supervisors' wokers. - * + * <p/> * If this is not set, a default system scheduler will be used. */ public static final String STORM_SCHEDULER = "storm.scheduler"; @@ -163,11 +158,10 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class; /** - * The hostname the supervisors/workers should report to nimbus. If unset, Storm will - * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>. - * - * You should set this config when you dont have a DNS which supervisors/workers - * can utilize to find each other based on hostname got from calls to + * The hostname the supervisors/workers should report to nimbus. If unset, Storm will get the hostname to report by calling + * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>. + * <p/> + * You should set this config when you dont have a DNS which supervisors/workers can utilize to find each other based on hostname got from calls to * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>. */ public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname"; @@ -198,25 +192,22 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class; /** - * The serializer class for ListDelegate (tuple payload). - * The default serializer will be ListDelegateSerializer + * The serializer class for ListDelegate (tuple payload). The default serializer will be ListDelegateSerializer */ public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer"; public static final Object TOPOLOGY_TUPLE_SERIALIZER_SCHEMA = String.class; /** - * Try to serialize all tuples, even for local transfers. This should only be used - * for testing, as a sanity check that all of your tuples are setup properly. + * Try to serialize all tuples, even for local transfers. This should only be used for testing, as a sanity check that all of your tuples are setup + * properly. */ public static final String TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE = "topology.testing.always.try.serialize"; public static final Object TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE_SCHEMA = Boolean.class; /** - * Whether or not to use ZeroMQ for messaging in local mode. If this is set - * to false, then Storm will use a pure-Java messaging system. The purpose - * of this flag is to make it easy to run Storm in local mode by eliminating - * the need for native dependencies, which can be difficult to install. - * + * Whether or not to use ZeroMQ for messaging in local mode. If this is set to false, then Storm will use a pure-Java messaging system. The purpose of this + * flag is to make it easy to run Storm in local mode by eliminating the need for native dependencies, which can be difficult to install. + * <p/> * Defaults to false. */ public static final String STORM_LOCAL_MODE_ZMQ = "storm.local.mode.zmq"; @@ -243,49 +234,45 @@ public class Config extends HashMap<String, Object> { /** * The number of times to retry a Zookeeper operation. */ - public static final String STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times"; + public static final String STORM_ZOOKEEPER_RETRY_TIMES = "storm.zookeeper.retry.times"; public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator; /** * The interval between retries of a Zookeeper operation. */ - public static final String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval"; + public static final String STORM_ZOOKEEPER_RETRY_INTERVAL = "storm.zookeeper.retry.interval"; public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator; /** * The ceiling of the interval between retries of a Zookeeper operation. */ - public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis"; + public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING = "storm.zookeeper.retry.intervalceiling.millis"; public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator; /** * The cluster Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication. */ - public static final String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme"; + public static final String STORM_ZOOKEEPER_AUTH_SCHEME = "storm.zookeeper.auth.scheme"; public static final Object STORM_ZOOKEEPER_AUTH_SCHEME_SCHEMA = String.class; /** - * A string representing the payload for cluster Zookeeper authentication. - * It gets serialized using UTF-8 encoding during authentication. - * Note that if this is set to something with a secret (as when using - * digest authentication) then it should only be set in the - * storm-cluster-auth.yaml file. - * This file storm-cluster-auth.yaml should then be protected with - * appropriate permissions that deny access from workers. + * A string representing the payload for cluster Zookeeper authentication. It gets serialized using UTF-8 encoding during authentication. Note that if this + * is set to something with a secret (as when using digest authentication) then it should only be set in the storm-cluster-auth.yaml file. This file + * storm-cluster-auth.yaml should then be protected with appropriate permissions that deny access from workers. */ - public static final String STORM_ZOOKEEPER_AUTH_PAYLOAD="storm.zookeeper.auth.payload"; + public static final String STORM_ZOOKEEPER_AUTH_PAYLOAD = "storm.zookeeper.auth.payload"; public static final Object STORM_ZOOKEEPER_AUTH_PAYLOAD_SCHEMA = String.class; /** * The topology Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication. */ - public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME="storm.zookeeper.topology.auth.scheme"; + public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME = "storm.zookeeper.topology.auth.scheme"; public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME_SCHEMA = String.class; /** * A string representing the payload for topology Zookeeper authentication. It gets serialized using UTF-8 encoding during authentication. */ - public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD="storm.zookeeper.topology.auth.payload"; + public static final String STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD = "storm.zookeeper.topology.auth.payload"; public static final Object STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD_SCHEMA = String.class; /** @@ -298,19 +285,19 @@ public class Config extends HashMap<String, Object> { /** * The number of times to retry a Nimbus operation. */ - public static final String STORM_NIMBUS_RETRY_TIMES="storm.nimbus.retry.times"; + public static final String STORM_NIMBUS_RETRY_TIMES = "storm.nimbus.retry.times"; public static final Object STORM_NIMBUS_RETRY_TIMES_SCHEMA = Number.class; /** * The starting interval between exponential backoff retries of a Nimbus operation. */ - public static final String STORM_NIMBUS_RETRY_INTERVAL="storm.nimbus.retry.interval.millis"; + public static final String STORM_NIMBUS_RETRY_INTERVAL = "storm.nimbus.retry.interval.millis"; public static final Object STORM_NIMBUS_RETRY_INTERVAL_SCHEMA = Number.class; /** * The ceiling of the interval between retries of a client connect to Nimbus operation. */ - public static final String STORM_NIMBUS_RETRY_INTERVAL_CEILING="storm.nimbus.retry.intervalceiling.millis"; + public static final String STORM_NIMBUS_RETRY_INTERVAL_CEILING = "storm.nimbus.retry.intervalceiling.millis"; public static final Object STORM_NIMBUS_RETRY_INTERVAL_CEILING_SCHEMA = Number.class; /** @@ -326,8 +313,7 @@ public class Config extends HashMap<String, Object> { public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class; /** - * Which port the Thrift interface of Nimbus should run on. Clients should - * connect to this port to upload jars and submit topologies. + * Which port the Thrift interface of Nimbus should run on. Clients should connect to this port to upload jars and submit topologies. */ public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port"; public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator; @@ -339,30 +325,29 @@ public class Config extends HashMap<String, Object> { public static final Object NIMBUS_THRIFT_THREADS_SCHEMA = Number.class; /** - * A list of users that are cluster admins and can run any command. To use this set - * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer + * A list of users that are cluster admins and can run any command. To use this set nimbus.authorizer to + * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer */ public static final String NIMBUS_ADMINS = "nimbus.admins"; public static final Object NIMBUS_ADMINS_SCHEMA = ConfigValidation.StringsValidator; /** - * A list of users that are the only ones allowed to run user operation on storm cluster. - * To use this set nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer + * A list of users that are the only ones allowed to run user operation on storm cluster. To use this set nimbus.authorizer to + * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer */ public static final String NIMBUS_USERS = "nimbus.users"; public static final Object NIMBUS_USERS_SCHEMA = ConfigValidation.StringsValidator; /** - * A list of groups , users belong to these groups are the only ones allowed to run user operation on storm cluster. - * To use this set nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer + * A list of groups , users belong to these groups are the only ones allowed to run user operation on storm cluster. To use this set nimbus.authorizer to + * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer */ public static final String NIMBUS_GROUPS = "nimbus.groups"; public static final Object NIMBUS_GROUPS_SCHEMA = ConfigValidation.StringsValidator; /** - * A list of users that run the supervisors and should be authorized to interact with - * nimbus as a supervisor would. To use this set - * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer + * A list of users that run the supervisors and should be authorized to interact with nimbus as a supervisor would. To use this set nimbus.authorizer to + * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer */ public static final String NIMBUS_SUPERVISOR_USERS = "nimbus.supervisor.users"; public static final Object NIMBUS_SUPERVISOR_USERS_SCHEMA = ConfigValidation.StringsValidator; @@ -374,83 +359,70 @@ public class Config extends HashMap<String, Object> { public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator; /** - * This parameter is used by the storm-deploy project to configure the - * jvm options for the nimbus daemon. + * This parameter is used by the storm-deploy project to configure the jvm options for the nimbus daemon. */ public static final String NIMBUS_CHILDOPTS = "nimbus.childopts"; public static final Object NIMBUS_CHILDOPTS_SCHEMA = String.class; - /** - * How long without heartbeating a task can go before nimbus will consider the - * task dead and reassign it to another location. + * How long without heartbeating a task can go before nimbus will consider the task dead and reassign it to another location. */ public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs"; public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; - /** - * How often nimbus should wake up to check heartbeats and do reassignments. Note - * that if a machine ever goes down Nimbus will immediately wake up and take action. - * This parameter is for checking for failures when there's no explicit event like that - * occuring. + * How often nimbus should wake up to check heartbeats and do reassignments. Note that if a machine ever goes down Nimbus will immediately wake up and take + * action. This parameter is for checking for failures when there's no explicit event like that occuring. */ public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs"; public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** * How often nimbus should wake the cleanup thread to clean the inbox. - * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS */ public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs"; public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** * The length of time a jar file lives in the inbox before being deleted by the cleanup thread. - * - * Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS. - * Note that the time it takes to delete an inbox jar file is going to be somewhat more than - * NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS - * is set to). - * @see NIMBUS_CLEANUP_FREQ_SECS + * <p/> + * Probably keep this value greater than or equal to NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS. Note that the time it takes to delete an inbox jar file is + * going to be somewhat more than NIMBUS_CLEANUP_INBOX_JAR_EXPIRATION_SECS (depending on how often NIMBUS_CLEANUP_FREQ_SECS is set to). */ public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs"; public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * How long before a supervisor can go without heartbeating before nimbus considers it dead - * and stops assigning new work to it. + * How long before a supervisor can go without heartbeating before nimbus considers it dead and stops assigning new work to it. */ public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs"; public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * A special timeout used when a task is initially launched. During launch, this is the timeout - * used until the first heartbeat, overriding nimbus.task.timeout.secs. - * - * <p>A separate timeout exists for launch because there can be quite a bit of overhead - * to launching new JVM's and configuring them.</p> + * A special timeout used when a task is initially launched. During launch, this is the timeout used until the first heartbeat, overriding + * nimbus.task.timeout.secs. + * <p/> + * <p> + * A separate timeout exists for launch because there can be quite a bit of overhead to launching new JVM's and configuring them. + * </p> */ public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs"; public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * Whether or not nimbus should reassign tasks if it detects that a task goes down. - * Defaults to true, and it's not recommended to change this value. + * Whether or not nimbus should reassign tasks if it detects that a task goes down. Defaults to true, and it's not recommended to change this value. */ public static final String NIMBUS_REASSIGN = "nimbus.reassign"; public static final Object NIMBUS_REASSIGN_SCHEMA = Boolean.class; /** - * During upload/download with the master, how long an upload or download connection is idle - * before nimbus considers it dead and drops the connection. + * During upload/download with the master, how long an upload or download connection is idle before nimbus considers it dead and drops the connection. */ public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs"; public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * A custom class that implements ITopologyValidator that is run whenever a - * topology is submitted. Can be used to provide business-specific logic for + * A custom class that implements ITopologyValidator that is run whenever a topology is submitted. Can be used to provide business-specific logic for * whether topologies are allowed to run or not. */ public static final String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator"; @@ -462,14 +434,12 @@ public class Config extends HashMap<String, Object> { public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer"; public static final Object NIMBUS_AUTHORIZER_SCHEMA = String.class; - /** * Impersonation user ACL config entries. */ public static final String NIMBUS_IMPERSONATION_AUTHORIZER = "nimbus.impersonation.authorizer"; public static final Object NIMBUS_IMPERSONATION_AUTHORIZER_SCHEMA = String.class; - /** * Impersonation user ACL config entries. */ @@ -489,8 +459,7 @@ public class Config extends HashMap<String, Object> { public static final Object NIMBUS_CREDENTIAL_RENEWERS_SCHEMA = ConfigValidation.StringsValidator; /** - * A list of plugins that nimbus should load during submit topology to populate - * credentials on user's behalf. + * A list of plugins that nimbus should load during submit topology to populate credentials on user's behalf. */ public static final String NIMBUS_AUTO_CRED_PLUGINS = "nimbus.autocredential.plugins.classes"; public static final Object NIMBUS_AUTO_CRED_PLUGINS_SCHEMA = ConfigValidation.StringsValidator; @@ -592,8 +561,7 @@ public class Config extends HashMap<String, Object> { public static final Object UI_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class; /** - * Type of keystore used by Storm UI for setting up HTTPS (SSL). - * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details. + * Type of keystore used by Storm UI for setting up HTTPS (SSL). see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details. */ public static final String UI_HTTPS_KEYSTORE_TYPE = "ui.https.keystore.type"; public static final Object UI_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class; @@ -617,8 +585,8 @@ public class Config extends HashMap<String, Object> { public static final Object UI_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class; /** - * Type of truststore used by Storm UI for setting up HTTPS (SSL). - * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details. + * Type of truststore used by Storm UI for setting up HTTPS (SSL). see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more + * details. */ public static final String UI_HTTPS_TRUSTSTORE_TYPE = "ui.https.truststore.type"; public static final Object UI_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class; @@ -632,7 +600,6 @@ public class Config extends HashMap<String, Object> { public static final String UI_HTTPS_NEED_CLIENT_AUTH = "ui.https.need.client.auth"; public static final Object UI_HTTPS_NEED_CLIENT_AUTH_SCHEMA = Boolean.class; - /** * List of DRPC servers so that the DRPCSpout knows who to talk to. */ @@ -664,8 +631,8 @@ public class Config extends HashMap<String, Object> { public static final Object DRPC_HTTPS_KEYSTORE_PASSWORD_SCHEMA = String.class; /** - * Type of keystore used by Storm DRPC for setting up HTTPS (SSL). - * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details. + * Type of keystore used by Storm DRPC for setting up HTTPS (SSL). see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more + * details. */ public static final String DRPC_HTTPS_KEYSTORE_TYPE = "drpc.https.keystore.type"; public static final Object DRPC_HTTPS_KEYSTORE_TYPE_SCHEMA = String.class; @@ -689,8 +656,8 @@ public class Config extends HashMap<String, Object> { public static final Object DRPC_HTTPS_TRUSTSTORE_PASSWORD_SCHEMA = String.class; /** - * Type of truststore used by Storm DRPC for setting up HTTPS (SSL). - * see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more details. + * Type of truststore used by Storm DRPC for setting up HTTPS (SSL). see http://docs.oracle.com/javase/7/docs/api/java/security/KeyStore.html for more + * details. */ public static final String DRPC_HTTPS_TRUSTSTORE_TYPE = "drpc.https.truststore.type"; public static final Object DRPC_HTTPS_TRUSTSTORE_TYPE_SCHEMA = String.class; @@ -724,26 +691,20 @@ public class Config extends HashMap<String, Object> { /** * The Access Control List for the DRPC Authorizer. - * @see DRPCSimpleAclAuthorizer */ public static final String DRPC_AUTHORIZER_ACL = "drpc.authorizer.acl"; public static final Object DRPC_AUTHORIZER_ACL_SCHEMA = Map.class; /** * File name of the DRPC Authorizer ACL. - * @see DRPCSimpleAclAuthorizer */ public static final String DRPC_AUTHORIZER_ACL_FILENAME = "drpc.authorizer.acl.filename"; public static final Object DRPC_AUTHORIZER_ACL_FILENAME_SCHEMA = String.class; /** - * Whether the DRPCSimpleAclAuthorizer should deny requests for operations - * involving functions that have no explicit ACL entry. When set to false - * (the default) DRPC functions that have no entry in the ACL will be - * permitted, which is appropriate for a development environment. When set - * to true, explicit ACL entries are required for every DRPC function, and - * any request for functions will be denied. - * @see DRPCSimpleAclAuthorizer + * Whether the DRPCSimpleAclAuthorizer should deny requests for operations involving functions that have no explicit ACL entry. When set to false (the + * default) DRPC functions that have no entry in the ACL will be permitted, which is appropriate for a development environment. When set to true, explicit + * ACL entries are required for every DRPC function, and any request for functions will be denied. */ public static final String DRPC_AUTHORIZER_ACL_STRICT = "drpc.authorizer.acl.strict"; public static final Object DRPC_AUTHORIZER_ACL_STRICT_SCHEMA = Boolean.class; @@ -785,11 +746,10 @@ public class Config extends HashMap<String, Object> { public static final Object DRPC_INVOCATIONS_THREADS_SCHEMA = Number.class; /** - * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also - * timeout based on the socket timeout on the DRPC client, and separately based on the topology message - * timeout for the topology implementing the DRPC function. + * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note that requests can also timeout based on the socket timeout on the DRPC + * client, and separately based on the topology message timeout for the topology implementing the DRPC function. */ - public static final String DRPC_REQUEST_TIMEOUT_SECS = "drpc.request.timeout.secs"; + public static final String DRPC_REQUEST_TIMEOUT_SECS = "drpc.request.timeout.secs"; public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** @@ -816,9 +776,8 @@ public class Config extends HashMap<String, Object> { public static final String SUPERVISOR_SCHEDULER_META = "supervisor.scheduler.meta"; public static final Object SUPERVISOR_SCHEDULER_META_SCHEMA = Map.class; /** - * A list of ports that can run workers on this supervisor. Each worker uses one port, and - * the supervisor will only run one worker per port. Use this configuration to tune - * how many workers run on each machine. + * A list of ports that can run workers on this supervisor. Each worker uses one port, and the supervisor will only run one worker per port. Use this + * configuration to tune how many workers run on each machine. */ public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports"; public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.IntegersValidator; @@ -836,8 +795,7 @@ public class Config extends HashMap<String, Object> { public static final Object DRPC_HTTP_FILTER_SCHEMA = String.class; /** - * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP - * service + * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP service */ public static final String DRPC_HTTP_FILTER_PARAMS = "drpc.http.filter.params"; public static final Object DRPC_HTTP_FILTER_PARAMS_SCHEMA = Map.class; @@ -849,15 +807,13 @@ public class Config extends HashMap<String, Object> { public static final Object NIMBUS_EXECUTORS_PER_TOPOLOGY_SCHEMA = Number.class; /** - * This parameter is used by the storm-deploy project to configure the - * jvm options for the supervisor daemon. + * This parameter is used by the storm-deploy project to configure the jvm options for the supervisor daemon. */ public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts"; public static final Object SUPERVISOR_CHILDOPTS_SCHEMA = String.class; /** - * How long a worker can go without heartbeating before the supervisor tries to - * restart the worker process. + * How long a worker can go without heartbeating before the supervisor tries to restart the worker process. */ public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs"; public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; @@ -869,18 +825,15 @@ public class Config extends HashMap<String, Object> { public static final Object SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * How long a worker can go without heartbeating during the initial launch before - * the supervisor tries to restart the worker process. This value override - * supervisor.worker.timeout.secs during launch because there is additional - * overhead to starting and configuring the JVM on launch. + * How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process. This value override + * supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the JVM on launch. */ public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs"; public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * Whether or not the supervisor should launch workers assigned to it. Defaults - * to true -- and you should probably never change this value. This configuration - * is used in the Storm unit tests. + * Whether or not the supervisor should launch workers assigned to it. Defaults to true -- and you should probably never change this value. This + * configuration is used in the Storm unit tests. */ public static final String SUPERVISOR_ENABLE = "supervisor.enable"; public static final Object SUPERVISOR_ENABLE_SCHEMA = Boolean.class; @@ -891,42 +844,34 @@ public class Config extends HashMap<String, Object> { public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs"; public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator; - /** - * How often the supervisor checks the worker heartbeats to see if any of them - * need to be restarted. + * How often the supervisor checks the worker heartbeats to see if any of them need to be restarted. */ public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs"; public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * Should the supervior try to run the worker as the lauching user or not. Defaults to false. + * Should the supervior try to run the worker as the lauching user or not. Defaults to false. */ public static final String SUPERVISOR_RUN_WORKER_AS_USER = "supervisor.run.worker.as.user"; public static final Object SUPERVISOR_RUN_WORKER_AS_USER_SCHEMA = Boolean.class; /** - * Full path to the worker-laucher executable that will be used to lauch workers when - * SUPERVISOR_RUN_WORKER_AS_USER is set to true. + * Full path to the worker-laucher executable that will be used to lauch workers when SUPERVISOR_RUN_WORKER_AS_USER is set to true. */ public static final String SUPERVISOR_WORKER_LAUNCHER = "supervisor.worker.launcher"; public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class; /** - * The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%" - * and "%WORKER-PORT%" substrings are replaced with: - * %ID% -> port (for backward compatibility), - * %WORKER-ID% -> worker-id, - * %TOPOLOGY-ID% -> topology-id, - * %WORKER-PORT% -> port. + * The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%" and "%WORKER-PORT%" substrings are replaced + * with: %ID% -> port (for backward compatibility), %WORKER-ID% -> worker-id, %TOPOLOGY-ID% -> topology-id, %WORKER-PORT% -> port. */ public static final String WORKER_CHILDOPTS = "worker.childopts"; public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator; /** - * The jvm opts provided to workers launched by this supervisor for GC. All "%ID%" substrings are replaced - * with an identifier for this worker. Because the JVM complains about multiple GC opts the topology - * can override this default value by setting topology.worker.gc.childopts. + * The jvm opts provided to workers launched by this supervisor for GC. All "%ID%" substrings are replaced with an identifier for this worker. Because the + * JVM complains about multiple GC opts the topology can override this default value by setting topology.worker.gc.childopts. */ public static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts"; public static final Object WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator; @@ -949,42 +894,37 @@ public class Config extends HashMap<String, Object> { public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs"; public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator; - /** - * How often a task should sync its connections with other tasks (if a task is - * reassigned, the other tasks sending messages to it need to refresh their connections). - * In general though, when a reassignment happens other tasks will be notified - * almost immediately. This configuration is here just in case that notification doesn't - * come through. + * How often a task should sync its connections with other tasks (if a task is reassigned, the other tasks sending messages to it need to refresh their + * connections). In general though, when a reassignment happens other tasks will be notified almost immediately. This configuration is here just in case + * that notification doesn't come through. */ public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs"; public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator; - /** * How often a task should sync credentials, worst case. */ public static final String TASK_CREDENTIALS_POLL_SECS = "task.credentials.poll.secs"; public static final Object TASK_CREDENTIALS_POLL_SECS_SCHEMA = Number.class; - /** - * A list of users that are allowed to interact with the topology. To use this set - * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer + * A list of users that are allowed to interact with the topology. To use this set nimbus.authorizer to + * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer */ public static final String TOPOLOGY_USERS = "topology.users"; public static final Object TOPOLOGY_USERS_SCHEMA = ConfigValidation.StringsValidator; /** - * A list of groups that are allowed to interact with the topology. To use this set - * nimbus.authorizer to backtype.storm.security.auth.authorizer.SimpleACLAuthorizer + * A list of groups that are allowed to interact with the topology. To use this set nimbus.authorizer to + * backtype.storm.security.auth.authorizer.SimpleACLAuthorizer */ public static final String TOPOLOGY_GROUPS = "topology.groups"; public static final Object TOPOLOGY_GROUPS_SCHEMA = ConfigValidation.StringsValidator; /** - * True if Storm should timeout messages or not. Defaults to true. This is meant to be used - * in unit tests to prevent tuples from being accidentally timed out during the test. + * True if Storm should timeout messages or not. Defaults to true. This is meant to be used in unit tests to prevent tuples from being accidentally timed + * out during the test. */ public static final String TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS = "topology.enable.message.timeouts"; public static final Object TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS_SCHEMA = Boolean.class; @@ -996,27 +936,22 @@ public class Config extends HashMap<String, Object> { public static final Object TOPOLOGY_DEBUG_SCHEMA = Boolean.class; /** - * The serializer for communication between shell components and non-JVM - * processes + * The serializer for communication between shell components and non-JVM processes */ public static final String TOPOLOGY_MULTILANG_SERIALIZER = "topology.multilang.serializer"; public static final Object TOPOLOGY_MULTILANG_SERIALIZER_SCHEMA = String.class; /** - * How many processes should be spawned around the cluster to execute this - * topology. Each process will execute some number of tasks as threads within - * them. This parameter should be used in conjunction with the parallelism hints - * on each component in the topology to tune the performance of a topology. + * How many processes should be spawned around the cluster to execute this topology. Each process will execute some number of tasks as threads within them. + * This parameter should be used in conjunction with the parallelism hints on each component in the topology to tune the performance of a topology. */ public static final String TOPOLOGY_WORKERS = "topology.workers"; public static final Object TOPOLOGY_WORKERS_SCHEMA = ConfigValidation.IntegerValidator; /** - * How many instances to create for a spout/bolt. A task runs on a thread with zero or more - * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always - * the same throughout the lifetime of a topology, but the number of executors (threads) for - * a spout/bolt can change over time. This allows a topology to scale to more or less resources - * without redeploying the topology or violating the constraints of Storm (such as a fields grouping + * How many instances to create for a spout/bolt. A task runs on a thread with zero or more other tasks for the same spout/bolt. The number of tasks for a + * spout/bolt is always the same throughout the lifetime of a topology, but the number of executors (threads) for a spout/bolt can change over time. This + * allows a topology to scale to more or less resources without redeploying the topology or violating the constraints of Storm (such as a fields grouping * guaranteeing that the same value goes to the same task). */ public static final String TOPOLOGY_TASKS = "topology.tasks"; @@ -1024,266 +959,242 @@ public class Config extends HashMap<String, Object> { /** * How many executors to spawn for ackers. - * - * <p>If this is set to 0, then Storm will immediately ack tuples as soon - * as they come off the spout, effectively disabling reliability.</p> + * <p/> + * <p> + * If this is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability. + * </p> */ public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors"; public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator; - /** - * The maximum amount of time given to the topology to fully process a message - * emitted by a spout. If the message is not acked within this time frame, Storm - * will fail the message on the spout. Some spouts implementations will then replay - * the message at a later time. + * The maximum amount of time given to the topology to fully process a message emitted by a spout. If the message is not acked within this time frame, Storm + * will fail the message on the spout. Some spouts implementations will then replay the message at a later time. */ public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs"; public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** - * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ), - * the underlying serialization framework for Storm. A serialization can either - * be the name of a class (in which case Kryo will automatically create a serializer for the class - * that saves all the object's fields), or an implementation of com.esotericsoftware.kryo.Serializer. - * + * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ), the underlying serialization framework for Storm. A serialization can + * either be the name of a class (in which case Kryo will automatically create a serializer for the class that saves all the object's fields), or an + * implementation of com.esotericsoftware.kryo.Serializer. + * <p/> * See Kryo's documentation for more information about writing custom serializers. */ public static final String TOPOLOGY_KRYO_REGISTER = "topology.kryo.register"; public static final Object TOPOLOGY_KRYO_REGISTER_SCHEMA = ConfigValidation.KryoRegValidator; /** - * A list of classes that customize storm's kryo instance during start-up. - * Each listed class name must implement IKryoDecorator. During start-up the - * listed class is instantiated with 0 arguments, then its 'decorate' method - * is called with storm's kryo instance as the only argument. + * A list of classes that customize storm's kryo instance during start-up. Each listed class name must implement IKryoDecorator. During start-up the listed + * class is instantiated with 0 arguments, then its 'decorate' method is called with storm's kryo instance as the only argument. */ public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators"; public static final Object TOPOLOGY_KRYO_DECORATORS_SCHEMA = ConfigValidation.StringsValidator; /** - * Class that specifies how to create a Kryo instance for serialization. Storm will then apply - * topology.kryo.register and topology.kryo.decorators on top of this. The default implementation - * implements topology.fall.back.on.java.serialization and turns references off. + * Class that specifies how to create a Kryo instance for serialization. Storm will then apply topology.kryo.register and topology.kryo.decorators on top of + * this. The default implementation implements topology.fall.back.on.java.serialization and turns references off. */ public static final String TOPOLOGY_KRYO_FACTORY = "topology.kryo.factory"; public static final Object TOPOLOGY_KRYO_FACTORY_SCHEMA = String.class; - /** - * Whether or not Storm should skip the loading of kryo registrations for which it - * does not know the class or have the serializer implementation. Otherwise, the task will - * fail to load and will throw an error at runtime. The use case of this is if you want to - * declare your serializations on the storm.yaml files on the cluster rather than every single - * time you submit a topology. Different applications may use different serializations and so - * a single application may not have the code for the other serializers used by other apps. - * By setting this config to true, Storm will ignore that it doesn't have those other serializations - * rather than throw an error. + * Whether or not Storm should skip the loading of kryo registrations for which it does not know the class or have the serializer implementation. Otherwise, + * the task will fail to load and will throw an error at runtime. The use case of this is if you want to declare your serializations on the storm.yaml files + * on the cluster rather than every single time you submit a topology. Different applications may use different serializations and so a single application + * may not have the code for the other serializers used by other apps. By setting this config to true, Storm will ignore that it doesn't have those other + * serializations rather than throw an error. */ - public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations"; + public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS = "topology.skip.missing.kryo.registrations"; public static final Object TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS_SCHEMA = Boolean.class; /* - * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format). - * Each listed class will be routed all the metrics data generated by the storm metrics API. - * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. + * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format). Each listed class will be routed all the metrics data + * generated by the storm metrics API. Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. */ public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; public static final Object TOPOLOGY_METRICS_CONSUMER_REGISTER_SCHEMA = ConfigValidation.MapsValidator; - /** - * The maximum parallelism allowed for a component in this topology. This configuration is - * typically used in testing to limit the number of threads spawned in local mode. + * The maximum parallelism allowed for a component in this topology. This configuration is typically used in testing to limit the number of threads spawned + * in local mode. */ - public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism"; + public static final String TOPOLOGY_MAX_TASK_PARALLELISM = "topology.max.task.parallelism"; public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = ConfigValidation.IntegerValidator; - /** - * The maximum number of tuples that can be pending on a spout task at any given time. - * This config applies to individual tasks, not to spouts or topologies as a whole. - * - * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. - * Note that this config parameter has no effect for unreliable spouts that don't tag - * their tuples with a message id. + * The maximum number of tuples that can be pending on a spout task at any given time. This config applies to individual tasks, not to spouts or topologies + * as a whole. + * <p/> + * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet. Note that this config parameter has no effect for + * unreliable spouts that don't tag their tuples with a message id. */ - public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; + public static final String TOPOLOGY_MAX_SPOUT_PENDING = "topology.max.spout.pending"; public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator; /** - * A class that implements a strategy for what to do when a spout needs to wait. Waiting is - * triggered in one of two conditions: - * - * 1. nextTuple emits no tuples - * 2. The spout has hit maxSpoutPending and can't emit any more tuples + * A class that implements a strategy for what to do when a spout needs to wait. Waiting is triggered in one of two conditions: + * <p/> + * 1. nextTuple emits no tuples 2. The spout has hit maxSpoutPending and can't emit any more tuples */ - public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy"; + public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY = "topology.spout.wait.strategy"; public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class; /** + * Configure the wait timeout used for timeout blocking wait strategy. + */ + public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT = "topology.disruptor.wait.timeout"; + public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_SCHEMA = Number.class; + + /** * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for. */ - public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms"; + public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS = "topology.sleep.spout.wait.strategy.time.ms"; public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = ConfigValidation.IntegerValidator; /** - * The maximum amount of time a component gives a source of state to synchronize before it requests - * synchronization again. + * The maximum amount of time a component gives a source of state to synchronize before it requests synchronization again. */ - public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs"; + public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS = "topology.state.synchronization.timeout.secs"; public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** * The percentage of tuples to sample to produce stats for a task. */ - public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate"; + public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate"; public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = ConfigValidation.DoubleValidator; /** * The time period that builtin metrics data in bucketed into. */ - public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs"; + public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS = "topology.builtin.metrics.bucket.size.secs"; public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** * Whether or not to use Java serialization in a topology. */ - public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization"; + public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION = "topology.fall.back.on.java.serialization"; public static final Object TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION_SCHEMA = Boolean.class; /** + * Whether or not need to be registered for kryo serialization in a topology. + */ + public static final String TOPOLOGY_KRYO_REGISTER_REQUIRED = + "topology.kryo.register.required"; + public static final Object TOPOLOGY_KRYO_REGISTER_REQUIRED_SCHEMA = + Boolean.class; + + /** * Topology-specific options for the worker child process. This is used in addition to WORKER_CHILDOPTS. */ - public static final String TOPOLOGY_WORKER_CHILDOPTS="topology.worker.childopts"; + public static final String TOPOLOGY_WORKER_CHILDOPTS = "topology.worker.childopts"; public static final Object TOPOLOGY_WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator; /** * Topology-specific options GC for the worker child process. This overrides WORKER_GC_CHILDOPTS. */ - public static final String TOPOLOGY_WORKER_GC_CHILDOPTS="topology.worker.gc.childopts"; + public static final String TOPOLOGY_WORKER_GC_CHILDOPTS = "topology.worker.gc.childopts"; public static final Object TOPOLOGY_WORKER_GC_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator; /** * Topology-specific classpath for the worker child process. This is combined to the usual classpath. */ - public static final String TOPOLOGY_CLASSPATH="topology.classpath"; + public static final String TOPOLOGY_CLASSPATH = "topology.classpath"; public static final Object TOPOLOGY_CLASSPATH_SCHEMA = ConfigValidation.StringOrStringListValidator; /** - * Topology-specific environment variables for the worker child process. - * This is added to the existing environment (that of the supervisor) + * Topology-specific environment variables for the worker child process. This is added to the existing environment (that of the supervisor) */ - public static final String TOPOLOGY_ENVIRONMENT="topology.environment"; - public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class; + public static final String TOPOLOGY_ENVIRONMENT = "topology.environment"; + public static final Object TOPOLOGY_ENVIRONMENT_SCHEMA = Map.class; /* - * Topology-specific option to disable/enable bolt's outgoing overflow buffer. - * Enabling this option ensures that the bolt can always clear the incoming messages, - * preventing live-lock for the topology with cyclic flow. - * The overflow buffer can fill degrading the performance gradually, - * eventually running out of memory. + * Topology-specific option to disable/enable bolt's outgoing overflow buffer. Enabling this option ensures that the bolt can always clear the incoming + * messages, preventing live-lock for the topology with cyclic flow. The overflow buffer can fill degrading the performance gradually, eventually running + * out of memory. */ - public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE="topology.bolts.outgoing.overflow.buffer.enable"; + public static final String TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE = "topology.bolts.outgoing.overflow.buffer.enable"; public static final Object TOPOLOGY_BOLTS_OUTGOING_OVERFLOW_BUFFER_ENABLE_SCHEMA = Boolean.class; /** - * This config is available for TransactionalSpouts, and contains the id ( a String) for - * the transactional topology. This id is used to store the state of the transactional - * topology in Zookeeper. + * This config is available for TransactionalSpouts, and contains the id ( a String) for the transactional topology. This id is used to store the state of + * the transactional topology in Zookeeper. */ - public static final String TOPOLOGY_TRANSACTIONAL_ID="topology.transactional.id"; + public static final String TOPOLOGY_TRANSACTIONAL_ID = "topology.transactional.id"; public static final Object TOPOLOGY_TRANSACTIONAL_ID_SCHEMA = String.class; /** - * A list of task hooks that are automatically added to every spout and bolt in the topology. An example - * of when you'd do this is to add a hook that integrates with your internal - * monitoring system. These hooks are instantiated using the zero-arg constructor. + * A list of task hooks that are automatically added to every spout and bolt in the topology. An example of when you'd do this is to add a hook that + * integrates with your internal monitoring system. These hooks are instantiated using the zero-arg constructor. */ - public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks"; + public static final String TOPOLOGY_AUTO_TASK_HOOKS = "topology.auto.task.hooks"; public static final Object TOPOLOGY_AUTO_TASK_HOOKS_SCHEMA = ConfigValidation.StringsValidator; - /** * The size of the Disruptor receive queue for each executor. Must be a power of 2. */ - public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE="topology.executor.receive.buffer.size"; + public static final String TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE = "topology.executor.receive.buffer.size"; public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator; /** - * The maximum number of messages to batch from the thread receiving off the network to the - * executor queues. Must be a power of 2. + * The maximum number of messages to batch from the thread receiving off the network to the executor queues. Must be a power of 2. */ - public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size"; + public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE = "topology.receiver.buffer.size"; public static final Object TOPOLOGY_RECEIVER_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator; /** * The size of the Disruptor send queue for each executor. Must be a power of 2. */ - public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE="topology.executor.send.buffer.size"; + public static final String TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE = "topology.executor.send.buffer.size"; public static final Object TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator; /** * The size of the Disruptor transfer queue for each worker. */ - public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size"; + public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE = "topology.transfer.buffer.size"; public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator; - /** - * How often a tick tuple from the "__system" component and "__tick" stream should be sent - * to tasks. Meant to be used as a component-specific configuration. - */ - public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs"; + /** + * How often a tick tuple from the "__system" component and "__tick" stream should be sent to tasks. Meant to be used as a component-specific configuration. + */ + public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS = "topology.tick.tuple.freq.secs"; public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator; - - /** - * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency - * vs. throughput - */ - public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy"; + /** + * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency vs. throughput + */ + public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY = "topology.disruptor.wait.strategy"; public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class; /** - * Configure the wait timeout used for timeout blocking wait strategy. + * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed via the TopologyContext. */ - public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT = - "topology.disruptor.wait.timeout"; - public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_SCHEMA = - Number.class; - - /* - * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed - * via the TopologyContext. - */ - public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size"; + public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE = "topology.worker.shared.thread.pool.size"; public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator; /** - * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, - * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be - * reported to Zookeeper per task for every 10 second interval of time. + * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, an interval of 10 seconds with + * topology.max.error.report.per.interval set to 5 will only allow 5 errors to be reported to Zookeeper per task for every 10 second interval of time. */ - public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs"; + public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS = "topology.error.throttle.interval.secs"; public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = ConfigValidation.IntegerValidator; /** * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS */ - public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval"; + public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL = "topology.max.error.report.per.interval"; public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator; - /** * How often a batch can be emitted in a Trident topology. */ - public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis"; + public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS = "topology.trident.batch.emit.interval.millis"; public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = ConfigValidation.IntegerValidator; /** * Name of the topology. This config is automatically set by Storm when the topology is submitted. */ - public final static String TOPOLOGY_NAME="topology.name"; + public final static String TOPOLOGY_NAME = "topology.name"; public static final Object TOPOLOGY_NAME_SCHEMA = String.class; /** @@ -1313,33 +1224,31 @@ public class Config extends HashMap<String, Object> { /** * Max pending tuples in one ShellBolt */ - public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending"; + public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING = "topology.shellbolt.max.pending"; public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = ConfigValidation.IntegerValidator; /** * The root directory in ZooKeeper for metadata about TransactionalSpouts. */ - public static final String TRANSACTIONAL_ZOOKEEPER_ROOT="transactional.zookeeper.root"; + public static final String TRANSACTIONAL_ZOOKEEPER_ROOT = "transactional.zookeeper.root"; public static final Object TRANSACTIONAL_ZOOKEEPER_ROOT_SCHEMA = String.class; /** - * The list of zookeeper servers in which to keep the transactional state. If null (which is default), - * will use storm.zookeeper.servers + * The list of zookeeper servers in which to keep the transactional state. If null (which is default), will use storm.zookeeper.servers */ - public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS="transactional.zookeeper.servers"; + public static final String TRANSACTIONAL_ZOOKEEPER_SERVERS = "transactional.zookeeper.servers"; public static final Object TRANSACTIONAL_ZOOKEEPER_SERVERS_SCHEMA = ConfigValidation.StringsValidator; /** - * The port to use to connect to the transactional zookeeper servers. If null (which is default), - * will use storm.zookeeper.port + * The port to use to connect to the transactional zookeeper servers. If null (which is default), will use storm.zookeeper.port */ - public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port"; + public static final String TRANSACTIONAL_ZOOKEEPER_PORT = "transactional.zookeeper.port"; public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator; /** * The user as which the nimbus client should be acquired to perform the operation. */ - public static final String STORM_DO_AS_USER="storm.doAsUser"; + public static final String STORM_DO_AS_USER = "storm.doAsUser"; public static final Object STORM_DO_AS_USER_SCHEMA = String.class; /** @@ -1349,58 +1258,54 @@ public class Config extends HashMap<String, Object> { public static final Object ZMQ_THREADS_SCHEMA = ConfigValidation.IntegerValidator; /** - * How long a connection should retry sending messages to a target host when - * the connection is closed. This is an advanced configuration and can almost + * How long a connection should retry sending messages to a target host when the connection is closed. This is an advanced configuration and can almost * certainly be ignored. */ public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis"; public static final Object ZMQ_LINGER_MILLIS_SCHEMA = ConfigValidation.IntegerValidator; /** - * The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion - * on the networking layer. + * The high water for the ZeroMQ push sockets used for networking. Use this config to prevent buffer explosion on the networking layer. */ public static final String ZMQ_HWM = "zmq.hwm"; public static final Object ZMQ_HWM_SCHEMA = ConfigValidation.IntegerValidator; /** - * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers) - * for the java.library.path value. java.library.path tells the JVM where - * to look for native libraries. It is necessary to set this config correctly since - * Storm uses the ZeroMQ and JZMQ native libs. + * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers) for the java.library.path value. java.library.path tells the JVM where to + * look for native libraries. It is necessary to set this config correctly since Storm uses the ZeroMQ and JZMQ native libs. */ public static final String JAVA_LIBRARY_PATH = "java.library.path"; public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class; /** - * The path to use as the zookeeper dir when running a zookeeper server via - * "storm dev-zookeeper". This zookeeper instance is only intended for development; + * The path to use as the zookeeper dir when running a zookeeper server via "storm dev-zookeeper". This zookeeper instance is only intended for development; * it is not a production grade zookeeper setup. */ public static final String DEV_ZOOKEEPER_PATH = "dev.zookeeper.path"; public static final Object DEV_ZOOKEEPER_PATH_SCHEMA = String.class; /** - * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler - * to backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler. + * A map from topology name to the number of machines that should be dedicated for that topology. Set storm.scheduler to + * backtype.storm.scheduler.IsolationScheduler to make use of the isolation scheduler. */ public static final String ISOLATION_SCHEDULER_MACHINES = "isolation.scheduler.machines"; public static final Object ISOLATION_SCHEDULER_MACHINES_SCHEMA = ConfigValidation.MapOfStringToNumberValidator; /** - * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler - * to backtype.storm.scheduler.multitenant.MultitenantScheduler + * A map from the user name to the number of machines that should that user is allowed to use. Set storm.scheduler to + * backtype.storm.scheduler.multitenant.MultitenantScheduler */ public static final String MULTITENANT_SCHEDULER_USER_POOLS = "multitenant.scheduler.user.pools"; public static final Object MULTITENANT_SCHEDULER_USER_POOLS_SCHEMA = ConfigValidation.MapOfStringToNumberValidator; /** - * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler - * to backtype.storm.scheduler.multitenant.MultitenantScheduler + * The number of machines that should be used by this topology to isolate it from all others. Set storm.scheduler to + * backtype.storm.scheduler.multitenant.MultitenantScheduler */ public static final String TOPOLOGY_ISOLATED_MACHINES = "topology.isolate.machines"; public static final Object TOPOLOGY_ISOLATED_MACHINES_SCHEMA = Number.class; + public static void setClasspath(Map conf, String cp) { conf.put(Config.TOPOLOGY_CLASSPATH, cp); } @@ -1473,14 +1378,16 @@ public class Config extends HashMap<String, Object> { m.put("parallelism.hint", parallelismHint); m.put("argument", argument); - List l = (List)conf.get(TOPOLOGY_METRICS_CONSUMER_REGISTER); - if (l == null) { l = new ArrayList(); } + List l = (List) conf.get(TOPOLOGY_METRICS_CONSUMER_REGISTER); + if (l == null) { + l = new ArrayList(); + } l.add(m); conf.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l); } public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) { - registerMetricsConsumer(this, klass, argument, parallelismHint); + registerMetricsConsumer(this, klass, argument, parallelismHint); } public static void registerMetricsConsumer(Map conf, Class klass, long parallelismHint) { @@ -1520,7 +1427,7 @@ public class Config extends HashMap<String, Object> { } public void setSkipMissingKryoRegistrations(boolean skip) { - setSkipMissingKryoRegistrations(this, skip); + setSkipMissingKryoRegistrations(this, skip); } public static void setMaxTaskParallelism(Map conf, int max) { @@ -1557,7 +1464,7 @@ public class Config extends HashMap<String, Object> { private static List getRegisteredSerializations(Map conf) { List ret; - if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { + if (!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { ret = new ArrayList(); } else { ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_REGISTER)); @@ -1568,7 +1475,7 @@ public class Config extends HashMap<String, Object> { private static List getRegisteredDecorators(Map conf) { List ret; - if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) { + if (!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) { ret = new ArrayList(); } else { ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)); @@ -1576,4 +1483,12 @@ public class Config extends HashMap<String, Object> { conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret); return ret; } + + public static void setKryoRegisterRequired(Map conf, boolean required) { + conf.put(Config.TOPOLOGY_KRYO_REGISTER_REQUIRED, required); + } + + public void setKryoRegisterRequired(boolean fallback) { + setKryoRegisterRequired(this, fallback); + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java b/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java index 24991d7..9fe2f69 100755 --- a/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java +++ b/jstorm-core/src/main/java/backtype/storm/ConfigValidation.java @@ -16,6 +16,7 @@ * limitations under the License. */ package backtype.storm; + import java.util.Map; import java.util.Map; @@ -31,13 +32,14 @@ public class ConfigValidation { public static interface FieldValidator { /** * Validates the given field. + * * @param name the name of the field. * @param field The field to be validated. * @throws IllegalArgumentException if the field fails validation. */ public void validateField(String name, Object field) throws IllegalArgumentException; } - + /** * Declares a method for validating configuration values that is nestable. */ @@ -46,9 +48,10 @@ public class ConfigValidation { public void validateField(String name, Object field) throws IllegalArgumentException { validateField(null, name, field); } - + /** * Validates the given field. + * * @param pd describes the parent wrapping this validator. * @param name the name of the field. * @param field The field to be validated. @@ -59,6 +62,7 @@ public class ConfigValidation { /** * Returns a new NestableFieldValidator for a given class. + * * @param cls the Class the field should be a type of * @param nullAllowed whether or not a value of null is valid * @return a NestableFieldValidator for that class @@ -66,99 +70,93 @@ public class ConfigValidation { public static NestableFieldValidator fv(final Class cls, final boolean nullAllowed) { return new NestableFieldValidator() { @Override - public void validateField(String pd, String name, Object field) - throws IllegalArgumentException { + public void validateField(String pd, String name, Object field) throws IllegalArgumentException { if (nullAllowed && field == null) { return; } - if (! cls.isInstance(field)) { - throw new IllegalArgumentException( - pd + name + " must be a " + cls.getName() + ". ("+field+")"); + if (!cls.isInstance(field)) { + throw new IllegalArgumentException(pd + name + " must be a " + cls.getName() + ". (" + field + ")"); } } }; } - + /** * Returns a new NestableFieldValidator for a List of the given Class. + * * @param cls the Class of elements composing the list * @param nullAllowed whether or not a value of null is valid * @return a NestableFieldValidator for a list of the given class */ public static NestableFieldValidator listFv(Class cls, boolean nullAllowed) { - return listFv(fv(cls, false), nullAllowed); + return listFv(fv(cls, false), nullAllowed); } - + /** * Returns a new NestableFieldValidator for a List where each item is validated by validator. + * * @param validator used to validate each item in the list * @param nullAllowed whether or not a value of null is valid * @return a NestableFieldValidator for a list with each item validated by a different validator. */ - public static NestableFieldValidator listFv(final NestableFieldValidator validator, - final boolean nullAllowed) { + public static NestableFieldValidator listFv(final NestableFieldValidator validator, final boolean nullAllowed) { return new NestableFieldValidator() { @Override - public void validateField(String pd, String name, Object field) - throws IllegalArgumentException { + public void validateField(String pd, String name, Object field) throws IllegalArgumentException { if (nullAllowed && field == null) { return; } if (field instanceof Iterable) { - for (Object e : (Iterable)field) { + for (Object e : (Iterable) field) { validator.validateField(pd + "Each element of the list ", name, e); } return; } - throw new IllegalArgumentException( - "Field " + name + " must be an Iterable but was " + - ((field == null) ? "null" : ("a " + field.getClass()))); + throw new IllegalArgumentException("Field " + name + " must be an Iterable but was " + ((field == null) ? "null" : ("a " + field.getClass()))); } }; } /** * Returns a new NestableFieldValidator for a Map of key to val. + * * @param key the Class of keys in the map * @param val the Class of values in the map * @param nullAllowed whether or not a value of null is valid * @return a NestableFieldValidator for a Map of key to val */ - public static NestableFieldValidator mapFv(Class key, Class val, - boolean nullAllowed) { + public static NestableFieldValidator mapFv(Class key, Class val, boolean nullAllowed) { return mapFv(fv(key, false), fv(val, false), nullAllowed); } - + /** * Returns a new NestableFieldValidator for a Map. + * * @param key a validator for the keys in the map * @param val a validator for the values in the map * @param nullAllowed whether or not a value of null is valid * @return a NestableFieldValidator for a Map - */ - public static NestableFieldValidator mapFv(final NestableFieldValidator key, - final NestableFieldValidator val, final boolean nullAllowed) { + */ + public static NestableFieldValidator mapFv(final NestableFieldValidator key, final NestableFieldValidator val, final boolean nullAllowed) { return new NestableFieldValidator() { @SuppressWarnings("unchecked") @Override - public void validateField(String pd, String name, Object field) - throws IllegalArgumentException { + public void validateField(String pd, String name, Object field) throws IllegalArgumentException { if (nullAllowed && field == null) { return; } if (field instanceof Map) { - for (Map.Entry<Object, Object> entry: ((Map<Object, Object>)field).entrySet()) { - key.validateField("Each key of the map ", name, entry.getKey()); - val.validateField("Each value in the map ", name, entry.getValue()); + for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) field).entrySet()) { + key.validateField("Each key of the map ", name, entry.getKey()); + val.validateField("Each value in the map ", name, entry.getValue()); } return; } - throw new IllegalArgumentException( - "Field " + name + " must be a Map"); + throw new IllegalArgumentException("Field " + name + " must be a Map"); } }; } - + /** * Validates a list of Numbers. */ @@ -175,8 +173,7 @@ public class ConfigValidation { public static Object MapOfStringToNumberValidator = mapFv(String.class, Number.class, true); /** - * Validates a map of Strings to a map of Strings to a list. - * {str -> {str -> [str,str]} + * Validates a map of Strings to a map of Strings to a list. {str -> {str -> [str,str]} */ public static Object MapOfStringToMapValidator = mapFv(fv(String.class, false), mapFv(fv(String.class, false), listFv(String.class, false), false), true); @@ -196,8 +193,7 @@ public class ConfigValidation { return; } final long i; - if (o instanceof Number && - (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) { + if (o instanceof Number && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) { return; } @@ -212,22 +208,19 @@ public class ConfigValidation { */ public static Object IntegersValidator = new FieldValidator() { @Override - public void validateField(String name, Object field) - throws IllegalArgumentException { + public void validateField(String name, Object field) throws IllegalArgumentException { if (field == null) { // A null value is acceptable. return; } if (field instanceof Iterable) { - for (Object o : (Iterable)field) { + for (Object o : (Iterable) field) { final long i; - if (o instanceof Number && - ((i = ((Number)o).longValue()) == ((Number)o).doubleValue()) && - (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE)) { + if (o instanceof Number && ((i = ((Number) o).longValue()) == ((Number) o).doubleValue()) + && (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE)) { // pass the test } else { - throw new IllegalArgumentException( - "Each element of the list " + name + " must be an Integer within type range."); + throw new IllegalArgumentException("Each element of the list " + name + " must be an Integer within type range."); } } return; @@ -266,11 +259,9 @@ public class ConfigValidation { return; } final long i; - if (o instanceof Number && - (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) - { + if (o instanceof Number && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { // Test whether the integer is a power of 2. - if (i > 0 && (i & (i-1)) == 0) { + if (i > 0 && (i & (i - 1)) == 0) { return; } } @@ -289,9 +280,7 @@ public class ConfigValidation { return; } final long i; - if (o instanceof Number && - (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) - { + if (o instanceof Number && (i = ((Number) o).longValue()) == ((Number) o).doubleValue()) { if (i > 0) { return; } @@ -311,24 +300,20 @@ public class ConfigValidation { return; } if (o instanceof Iterable) { - for (Object e : (Iterable)o) { + for (Object e : (Iterable) o) { if (e instanceof Map) { - for (Map.Entry<Object,Object> entry: ((Map<Object,Object>)e).entrySet()) { - if (!(entry.getKey() instanceof String) || - !(entry.getValue() instanceof String)) { - throw new IllegalArgumentException( - "Each element of the list " + name + " must be a String or a Map of Strings"); + for (Map.Entry<Object, Object> entry : ((Map<Object, Object>) e).entrySet()) { + if (!(entry.getKey() instanceof String) || !(entry.getValue() instanceof String)) { + throw new IllegalArgumentException("Each element of the list " + name + " must be a String or a Map of Strings"); } } } else if (!(e instanceof String)) { - throw new IllegalArgumentException( - "Each element of the list " + name + " must be a String or a Map of Strings"); + throw new IllegalArgumentException("Each element of the list " + name + " must be a String or a Map of Strings"); } } return; } - throw new IllegalArgumentException( - "Field " + name + " must be an Iterable containing only Strings or Maps of Strings"); + throw new IllegalArgumentException("Field " + name + " must be an Iterable containing only Strings or Maps of Strings"); } };
