http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java b/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java index 26a068c..333d3bb 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/client/ConfigExtension.java @@ -17,35 +17,55 @@ */ package com.alibaba.jstorm.client; +import backtype.storm.Config; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.utils.JStormUtils; +import org.apache.commons.lang.StringUtils; + import java.security.InvalidParameterException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; - -import backtype.storm.Config; -import backtype.storm.utils.Utils; - -import com.alibaba.jstorm.utils.JStormUtils; - public class ConfigExtension { /** - * if this configure has been set, the spout or bolt will log all receive - * tuples - * + * if this configure has been set, the spout or bolt will log all receive tuples + * <p/> * topology.debug just for logging all sent tuples */ - protected static final String TOPOLOGY_DEBUG_RECV_TUPLE = - "topology.debug.recv.tuple"; + protected static final String TOPOLOGY_DEBUG_RECV_TUPLE = "topology.debug.recv.tuple"; public static void setTopologyDebugRecvTuple(Map conf, boolean debug) { conf.put(TOPOLOGY_DEBUG_RECV_TUPLE, Boolean.valueOf(debug)); } public static Boolean isTopologyDebugRecvTuple(Map conf) { - return JStormUtils.parseBoolean(conf.get(TOPOLOGY_DEBUG_RECV_TUPLE), - false); + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_DEBUG_RECV_TUPLE), false); + } + + private static final String TOPOLOGY_ENABLE_METRIC_DEBUG = "topology.enable.metric.debug"; + + public static boolean isEnableMetricDebug(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_METRIC_DEBUG), false); + } + + private static final String TOPOLOGY_DEBUG_METRIC_NAMES = "topology.debug.metric.names"; + + public static String getDebugMetricNames(Map conf) { + String metrics = (String) conf.get(TOPOLOGY_DEBUG_METRIC_NAMES); + if (metrics == null) { + return ""; + } + return metrics; + } + + /** + * metrics switch, ONLY for performance test, DO NOT set it to false in production + */ + private static final String TOPOLOGY_ENABLE_METRICS = "topology.enable.metrics"; + + public static boolean isEnableMetrics(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_METRICS), true); } /** @@ -53,27 +73,20 @@ public class ConfigExtension { */ private static final Integer DEFAULT_DEAMON_HTTPSERVER_PORT = 7621; - protected static final String SUPERVISOR_DEAMON_HTTPSERVER_PORT = - "supervisor.deamon.logview.port"; + protected static final String SUPERVISOR_DEAMON_HTTPSERVER_PORT = "supervisor.deamon.logview.port"; public static Integer getSupervisorDeamonHttpserverPort(Map conf) { - return JStormUtils.parseInt( - conf.get(SUPERVISOR_DEAMON_HTTPSERVER_PORT), - DEFAULT_DEAMON_HTTPSERVER_PORT + 1); + return JStormUtils.parseInt(conf.get(SUPERVISOR_DEAMON_HTTPSERVER_PORT), DEFAULT_DEAMON_HTTPSERVER_PORT + 1); } - protected static final String NIMBUS_DEAMON_HTTPSERVER_PORT = - "nimbus.deamon.logview.port"; + protected static final String NIMBUS_DEAMON_HTTPSERVER_PORT = "nimbus.deamon.logview.port"; public static Integer getNimbusDeamonHttpserverPort(Map conf) { - return JStormUtils.parseInt(conf.get(NIMBUS_DEAMON_HTTPSERVER_PORT), - DEFAULT_DEAMON_HTTPSERVER_PORT); + return JStormUtils.parseInt(conf.get(NIMBUS_DEAMON_HTTPSERVER_PORT), DEFAULT_DEAMON_HTTPSERVER_PORT); } /** * Worker gc parameter - * - * */ protected static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts"; @@ -85,8 +98,7 @@ public class ConfigExtension { return (String) conf.get(WORKER_GC_CHILDOPTS); } - protected static final String WOREKER_REDIRECT_OUTPUT = - "worker.redirect.output"; + protected static final String WOREKER_REDIRECT_OUTPUT = "worker.redirect.output"; public static boolean getWorkerRedirectOutput(Map conf) { Object result = conf.get(WOREKER_REDIRECT_OUTPUT); @@ -95,8 +107,7 @@ public class ConfigExtension { return (Boolean) result; } - protected static final String WOREKER_REDIRECT_OUTPUT_FILE = - "worker.redirect.output.file"; + protected static final String WOREKER_REDIRECT_OUTPUT_FILE = "worker.redirect.output.file"; public static void setWorkerRedirectOutputFile(Map conf, String outputPath) { conf.put(WOREKER_REDIRECT_OUTPUT_FILE, outputPath); @@ -107,9 +118,8 @@ public class ConfigExtension { } /** - * Usually, spout finish prepare before bolt, so spout need wait several - * seconds so that bolt finish preparation - * + * Usually, spout finish prepare before bolt, so spout need wait several seconds so that bolt finish preparation + * <p/> * By default, the setting is 30 seconds */ protected static final String SPOUT_DELAY_RUN = "spout.delay.run"; @@ -154,32 +164,26 @@ public class ConfigExtension { } /** - * if the setting has been set, the component's task must run different node - * This is conflict with USE_SINGLE_NODE + * if the setting has been set, the component's task must run different node This is conflict with USE_SINGLE_NODE */ - protected static final String TASK_ON_DIFFERENT_NODE = - "task.on.differ.node"; + protected static final String TASK_ON_DIFFERENT_NODE = "task.on.differ.node"; public static void setTaskOnDifferentNode(Map conf, boolean isIsolate) { conf.put(TASK_ON_DIFFERENT_NODE, Boolean.valueOf(isIsolate)); } public static boolean isTaskOnDifferentNode(Map conf) { - return JStormUtils - .parseBoolean(conf.get(TASK_ON_DIFFERENT_NODE), false); + return JStormUtils.parseBoolean(conf.get(TASK_ON_DIFFERENT_NODE), false); } - protected static final String SUPERVISOR_ENABLE_CGROUP = - "supervisor.enable.cgroup"; + protected static final String SUPERVISOR_ENABLE_CGROUP = "supervisor.enable.cgroup"; public static boolean isEnableCgroup(Map conf) { - return JStormUtils.parseBoolean(conf.get(SUPERVISOR_ENABLE_CGROUP), - false); + return JStormUtils.parseBoolean(conf.get(SUPERVISOR_ENABLE_CGROUP), false); } /** - * If component or topology configuration set "use.old.assignment", will try - * use old assignment firstly + * If component or topology configuration set "use.old.assignment", will try use old assignment firstly */ protected static final String USE_OLD_ASSIGNMENT = "use.old.assignment"; @@ -213,12 +217,10 @@ public class ConfigExtension { return JStormUtils.parseBoolean(conf.get(NIMBUS_USE_IP), false); } - protected static final String TOPOLOGY_ENABLE_CLASSLOADER = - "topology.enable.classloader"; + protected static final String TOPOLOGY_ENABLE_CLASSLOADER = "topology.enable.classloader"; public static boolean isEnableTopologyClassLoader(Map conf) { - return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_CLASSLOADER), - false); + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_CLASSLOADER), false); } public static void setEnableTopologyClassLoader(Map conf, boolean enable) { @@ -235,14 +237,10 @@ public class ConfigExtension { conf.put(CLASSLOADER_DEBUG, enable); } - protected static final String CONTAINER_NIMBUS_HEARTBEAT = - "container.nimbus.heartbeat"; + protected static final String CONTAINER_NIMBUS_HEARTBEAT = "container.nimbus.heartbeat"; /** * Get to know whether nimbus is run under Apsara/Yarn container - * - * @param conf - * @return */ public static boolean isEnableContainerNimbus() { String path = System.getenv(CONTAINER_NIMBUS_HEARTBEAT); @@ -256,23 +254,15 @@ public class ConfigExtension { /** * Get Apsara/Yarn nimbus container's hearbeat dir - * - * @param conf - * @return */ public static String getContainerNimbusHearbeat() { return System.getenv(CONTAINER_NIMBUS_HEARTBEAT); } - protected static final String CONTAINER_SUPERVISOR_HEARTBEAT = - "container.supervisor.heartbeat"; + protected static final String CONTAINER_SUPERVISOR_HEARTBEAT = "container.supervisor.heartbeat"; /** - * Get to know whether supervisor is run under Apsara/Yarn supervisor - * container - * - * @param conf - * @return + * Get to know whether supervisor is run under Apsara/Yarn supervisor container */ public static boolean isEnableContainerSupervisor() { String path = System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT); @@ -286,28 +276,21 @@ public class ConfigExtension { /** * Get Apsara/Yarn supervisor container's hearbeat dir - * - * @param conf - * @return */ public static String getContainerSupervisorHearbeat() { return (String) System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT); } - protected static final String CONTAINER_HEARTBEAT_TIMEOUT_SECONDS = - "container.heartbeat.timeout.seconds"; + protected static final String CONTAINER_HEARTBEAT_TIMEOUT_SECONDS = "container.heartbeat.timeout.seconds"; public static int getContainerHeartbeatTimeoutSeconds(Map conf) { - return JStormUtils.parseInt( - conf.get(CONTAINER_HEARTBEAT_TIMEOUT_SECONDS), 240); + return JStormUtils.parseInt(conf.get(CONTAINER_HEARTBEAT_TIMEOUT_SECONDS), 240); } - protected static final String CONTAINER_HEARTBEAT_FREQUENCE = - "container.heartbeat.frequence"; + protected static final String CONTAINER_HEARTBEAT_FREQUENCE = "container.heartbeat.frequence"; public static int getContainerHeartbeatFrequence(Map conf) { - return JStormUtils - .parseInt(conf.get(CONTAINER_HEARTBEAT_FREQUENCE), 10); + return JStormUtils.parseInt(conf.get(CONTAINER_HEARTBEAT_FREQUENCE), 10); } protected static final String JAVA_SANDBOX_ENABLE = "java.sandbox.enable"; @@ -326,12 +309,10 @@ public class ConfigExtension { conf.put(SPOUT_SINGLE_THREAD, enable); } - protected static String WORKER_STOP_WITHOUT_SUPERVISOR = - "worker.stop.without.supervisor"; + protected static String WORKER_STOP_WITHOUT_SUPERVISOR = "worker.stop.without.supervisor"; public static boolean isWorkerStopWithoutSupervisor(Map conf) { - return JStormUtils.parseBoolean( - conf.get(WORKER_STOP_WITHOUT_SUPERVISOR), false); + return JStormUtils.parseBoolean(conf.get(WORKER_STOP_WITHOUT_SUPERVISOR), false); } protected static String CGROUP_ROOT_DIR = "supervisor.cgroup.rootdir"; @@ -340,33 +321,15 @@ public class ConfigExtension { return (String) conf.get(CGROUP_ROOT_DIR); } - protected static String NETTY_TRANSFER_ASYNC_AND_BATCH = - "storm.messaging.netty.transfer.async.batch"; + protected static String NETTY_TRANSFER_ASYNC_AND_BATCH = "storm.messaging.netty.transfer.async.batch"; public static boolean isNettyTransferAsyncBatch(Map conf) { - return JStormUtils.parseBoolean( - conf.get(NETTY_TRANSFER_ASYNC_AND_BATCH), true); + return JStormUtils.parseBoolean(conf.get(NETTY_TRANSFER_ASYNC_AND_BATCH), true); } - - protected static String NETTY_PENDING_BUFFER_TIMEOUT = - "storm.messaging.netty.pending.buffer.timeout"; - public static void setNettyPendingBufferTimeout(Map conf, Long timeout) { - conf.put(NETTY_PENDING_BUFFER_TIMEOUT, timeout); - } - - public static long getNettyPendingBufferTimeout(Map conf) { - int messageTimeout = JStormUtils.parseInt( - conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 120); - return JStormUtils.parseLong( - conf.get(NETTY_PENDING_BUFFER_TIMEOUT), messageTimeout*1000); - } + protected static final String USE_USERDEFINE_ASSIGNMENT = "use.userdefine.assignment"; - protected static final String USE_USERDEFINE_ASSIGNMENT = - "use.userdefine.assignment"; - - public static void setUserDefineAssignment(Map conf, - List<WorkerAssignment> userDefines) { + public static void setUserDefineAssignment(Map conf, List<WorkerAssignment> userDefines) { List<String> ret = new ArrayList<String>(); for (WorkerAssignment worker : userDefines) { ret.add(Utils.to_json(worker)); @@ -384,6 +347,17 @@ public class ConfigExtension { return ret; } + protected static String NETTY_PENDING_BUFFER_TIMEOUT = "storm.messaging.netty.pending.buffer.timeout"; + + public static void setNettyPendingBufferTimeout(Map conf, Long timeout) { + conf.put(NETTY_PENDING_BUFFER_TIMEOUT, timeout); + } + + public static long getNettyPendingBufferTimeout(Map conf) { + int messageTimeout = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 120); + return JStormUtils.parseLong(conf.get(NETTY_PENDING_BUFFER_TIMEOUT), messageTimeout * 1000); + } + protected static final String MEMSIZE_PER_WORKER = "worker.memory.size"; public static void setMemSizePerWorker(Map conf, long memSize) { @@ -406,12 +380,25 @@ public class ConfigExtension { } public static long getMemSizePerWorker(Map conf) { - long size = - JStormUtils.parseLong(conf.get(MEMSIZE_PER_WORKER), - JStormUtils.SIZE_1_G * 2); + long size = JStormUtils.parseLong(conf.get(MEMSIZE_PER_WORKER), JStormUtils.SIZE_1_G * 2); return size > 0 ? size : JStormUtils.SIZE_1_G * 2; } + protected static final String MIN_MEMSIZE_PER_WORKER = "worker.memory.min.size"; + + public static void setMemMinSizePerWorker(Map conf, long memSize) { + conf.put(MIN_MEMSIZE_PER_WORKER, memSize); + } + + public static long getMemMinSizePerWorker(Map conf) { + long maxMemSize = getMemSizePerWorker(conf); + + Long size = JStormUtils.parseLong(conf.get(MIN_MEMSIZE_PER_WORKER)); + long minMemSize = (size == null || size == 0) ? maxMemSize : size; + + return minMemSize; + } + protected static final String CPU_SLOT_PER_WORKER = "worker.cpu.slot.num"; public static void setCpuSlotNumPerWorker(Map conf, int slotNum) { @@ -423,39 +410,34 @@ public class ConfigExtension { return slot > 0 ? slot : 1; } - protected static String TOPOLOGY_PERFORMANCE_METRICS = - "topology.performance.metrics"; + protected static String TOPOLOGY_PERFORMANCE_METRICS = "topology.performance.metrics"; public static boolean isEnablePerformanceMetrics(Map conf) { - return JStormUtils.parseBoolean(conf.get(TOPOLOGY_PERFORMANCE_METRICS), - true); + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_PERFORMANCE_METRICS), true); } public static void setPerformanceMetrics(Map conf, boolean isEnable) { conf.put(TOPOLOGY_PERFORMANCE_METRICS, isEnable); } - protected static String NETTY_BUFFER_THRESHOLD_SIZE = - "storm.messaging.netty.buffer.threshold"; + protected static String NETTY_BUFFER_THRESHOLD_SIZE = "storm.messaging.netty.buffer.threshold"; public static long getNettyBufferThresholdSize(Map conf) { - return JStormUtils.parseLong(conf.get(NETTY_BUFFER_THRESHOLD_SIZE), - 8 * JStormUtils.SIZE_1_M); + return JStormUtils.parseLong(conf.get(NETTY_BUFFER_THRESHOLD_SIZE), 8 * JStormUtils.SIZE_1_M); } public static void setNettyBufferThresholdSize(Map conf, long size) { conf.put(NETTY_BUFFER_THRESHOLD_SIZE, size); } - protected static String NETTY_MAX_SEND_PENDING = - "storm.messaging.netty.max.pending"; + protected static String NETTY_MAX_SEND_PENDING = "storm.messaging.netty.max.pending"; public static void setNettyMaxSendPending(Map conf, long pending) { conf.put(NETTY_MAX_SEND_PENDING, pending); } public static long getNettyMaxSendPending(Map conf) { - return JStormUtils.parseLong(conf.get(NETTY_MAX_SEND_PENDING), 4); + return JStormUtils.parseLong(conf.get(NETTY_MAX_SEND_PENDING), 16); } protected static String DISRUPTOR_USE_SLEEP = "disruptor.use.sleep"; @@ -469,9 +451,7 @@ public class ConfigExtension { } public static boolean isTopologyContainAcker(Map conf) { - int num = - JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), - 1); + int num = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), 1); if (num > 0) { return true; } else { @@ -489,8 +469,7 @@ public class ConfigExtension { conf.put(NETTY_SYNC_MODE, sync); } - protected static String NETTY_ASYNC_BLOCK = - "storm.messaging.netty.async.block"; + protected static String NETTY_ASYNC_BLOCK = "storm.messaging.netty.async.block"; public static boolean isNettyASyncBlock(Map conf) { return JStormUtils.parseBoolean(conf.get(NETTY_ASYNC_BLOCK), true); @@ -500,20 +479,17 @@ public class ConfigExtension { conf.put(NETTY_ASYNC_BLOCK, block); } - protected static String ALIMONITOR_METRICS_POST = - "topology.alimonitor.metrics.post"; + protected static String ALIMONITOR_METRICS_POST = "topology.alimonitor.metrics.post"; public static boolean isAlimonitorMetricsPost(Map conf) { - return JStormUtils - .parseBoolean(conf.get(ALIMONITOR_METRICS_POST), true); + return JStormUtils.parseBoolean(conf.get(ALIMONITOR_METRICS_POST), true); } public static void setAlimonitorMetricsPost(Map conf, boolean post) { conf.put(ALIMONITOR_METRICS_POST, post); } - public static String TASK_CLEANUP_TIMEOUT_SEC = - "task.cleanup.timeout.sec"; + public static String TASK_CLEANUP_TIMEOUT_SEC = "task.cleanup.timeout.sec"; public static int getTaskCleanupTimeoutSec(Map conf) { return JStormUtils.parseInt(conf.get(TASK_CLEANUP_TIMEOUT_SEC), 10); @@ -566,6 +542,7 @@ public class ConfigExtension { return JStormUtils.parseInt(uiCluster.get(UI_CLUSTER_ZK_PORT)); } + protected static String SPOUT_PEND_FULL_SLEEP = "spout.pending.full.sleep"; public static boolean isSpoutPendFullSleep(Map conf) { @@ -577,8 +554,7 @@ public class ConfigExtension { } - protected static String LOGVIEW_ENCODING = - "supervisor.deamon.logview.encoding"; + protected static String LOGVIEW_ENCODING = "supervisor.deamon.logview.encoding"; protected static String UTF8 = "utf-8"; public static String getLogViewEncoding(Map conf) { @@ -603,16 +579,13 @@ public class ConfigExtension { } public static String TASK_STATUS_ACTIVE = "Active"; + public static String TASK_STATUS_INACTIVE = "Inactive"; public static String TASK_STATUS_STARTING = "Starting"; - protected static String ALIMONITOR_TOPO_METIRC_NAME = - "topology.alimonitor.topo.metrics.name"; - protected static String ALIMONITOR_TASK_METIRC_NAME = - "topology.alimonitor.task.metrics.name"; - protected static String ALIMONITOR_WORKER_METIRC_NAME = - "topology.alimonitor.worker.metrics.name"; - protected static String ALIMONITOR_USER_METIRC_NAME = - "topology.alimonitor.user.metrics.name"; + protected static String ALIMONITOR_TOPO_METIRC_NAME = "topology.alimonitor.topo.metrics.name"; + protected static String ALIMONITOR_TASK_METIRC_NAME = "topology.alimonitor.task.metrics.name"; + protected static String ALIMONITOR_WORKER_METIRC_NAME = "topology.alimonitor.worker.metrics.name"; + protected static String ALIMONITOR_USER_METIRC_NAME = "topology.alimonitor.user.metrics.name"; public static String getAlmonTopoMetricName(Map conf) { return (String) conf.get(ALIMONITOR_TOPO_METIRC_NAME); @@ -635,8 +608,7 @@ public class ConfigExtension { public static Integer getSpoutParallelism(Map conf, String componentName) { Integer ret = null; - Map<String, String> map = - (Map<String, String>) (conf.get(SPOUT_PARALLELISM)); + Map<String, String> map = (Map<String, String>) (conf.get(SPOUT_PARALLELISM)); if (map != null) ret = JStormUtils.parseInt(map.get(componentName)); return ret; @@ -644,15 +616,13 @@ public class ConfigExtension { public static Integer getBoltParallelism(Map conf, String componentName) { Integer ret = null; - Map<String, String> map = - (Map<String, String>) (conf.get(BOLT_PARALLELISM)); + Map<String, String> map = (Map<String, String>) (conf.get(BOLT_PARALLELISM)); if (map != null) ret = JStormUtils.parseInt(map.get(componentName)); return ret; } - protected static String TOPOLOGY_BUFFER_SIZE_LIMITED = - "topology.buffer.size.limited"; + protected static String TOPOLOGY_BUFFER_SIZE_LIMITED = "topology.buffer.size.limited"; public static void setTopologyBufferSizeLimited(Map conf, boolean limited) { conf.put(TOPOLOGY_BUFFER_SIZE_LIMITED, limited); @@ -664,30 +634,38 @@ public class ConfigExtension { return true; } - return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), - true); + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), true); } - protected static String SUPERVISOR_SLOTS_PORTS_BASE = - "supervisor.slots.ports.base"; + protected static String SUPERVISOR_SLOTS_PORTS_BASE = "supervisor.slots.ports.base"; public static int getSupervisorSlotsPortsBase(Map conf) { - return JStormUtils - .parseInt(conf.get(SUPERVISOR_SLOTS_PORTS_BASE), 6800); + return JStormUtils.parseInt(conf.get(SUPERVISOR_SLOTS_PORTS_BASE), 6800); } // SUPERVISOR_SLOTS_PORTS_BASE don't provide setting function, it must be // set by configuration - protected static String SUPERVISOR_SLOTS_PORT_CPU_WEIGHT = - "supervisor.slots.port.cpu.weight"; + protected static String SUPERVISOR_SLOTS_PORT_CPU_WEIGHT = "supervisor.slots.port.cpu.weight"; public static double getSupervisorSlotsPortCpuWeight(Map conf) { Object value = conf.get(SUPERVISOR_SLOTS_PORT_CPU_WEIGHT); Double ret = JStormUtils.convertToDouble(value); - if (ret == null) { - return 1.0; + if (ret == null || ret <= 0) { + return 1.2; + } else { + return ret; + } + } + + protected static String SUPERVISOR_SLOTS_PORT_MEM_WEIGHT = "supervisor.slots.port.mem.weight"; + + public static double getSupervisorSlotsPortMemWeight(Map conf) { + Object value = conf.get(SUPERVISOR_SLOTS_PORT_MEM_WEIGHT); + Double ret = JStormUtils.convertToDouble(value); + if (ret == null || ret <= 0) { + return 0.7; } else { return ret; } @@ -706,8 +684,7 @@ public class ConfigExtension { conf.put(USER_DEFINED_LOG4J_CONF, fileName); } - protected static String USER_DEFINED_LOGBACK_CONF = - "user.defined.logback.conf"; + protected static String USER_DEFINED_LOGBACK_CONF = "user.defined.logback.conf"; public static String getUserDefinedLogbackConf(Map conf) { return (String) conf.get(USER_DEFINED_LOGBACK_CONF); @@ -717,12 +694,10 @@ public class ConfigExtension { conf.put(USER_DEFINED_LOGBACK_CONF, fileName); } - protected static String TASK_ERROR_INFO_REPORT_INTERVAL = - "topology.task.error.report.interval"; + protected static String TASK_ERROR_INFO_REPORT_INTERVAL = "topology.task.error.report.interval"; public static Integer getTaskErrorReportInterval(Map conf) { - return JStormUtils.parseInt(conf.get(TASK_ERROR_INFO_REPORT_INTERVAL), - 60); + return JStormUtils.parseInt(conf.get(TASK_ERROR_INFO_REPORT_INTERVAL), 60); } public static void setTaskErrorReportInterval(Map conf, Integer interval) { @@ -739,18 +714,16 @@ public class ConfigExtension { conf.put(DEFAULT_CACHE_TIMEOUT, timeout); } - protected static String WORKER_MERTRIC_REPORT_FREQUENCY = - "worker.metric.report.frequency.secs"; + protected static String WORKER_MERTRIC_REPORT_CHECK_FREQUENCY = "worker.metric.report.frequency.secs"; - public static int getWorkerMetricReportFrequency(Map conf) { - return JStormUtils.parseInt(conf.get(WORKER_MERTRIC_REPORT_FREQUENCY), - 60); + public static int getWorkerMetricReportCheckFrequency(Map conf) { + return JStormUtils.parseInt(conf.get(WORKER_MERTRIC_REPORT_CHECK_FREQUENCY), 60); } public static void setWorkerMetricReportFrequency(Map conf, int frequence) { - conf.put(WORKER_MERTRIC_REPORT_FREQUENCY, frequence); + conf.put(WORKER_MERTRIC_REPORT_CHECK_FREQUENCY, frequence); } - + /** * Store local worker port/workerId/supervisorId to configuration */ @@ -767,7 +740,7 @@ public class ConfigExtension { } public static String getLocalWorkerId(Map conf) { - return (String)conf.get(LOCLA_WORKER_ID); + return (String) conf.get(LOCLA_WORKER_ID); } public static void setLocalWorkerId(Map conf, String workerId) { @@ -775,25 +748,24 @@ public class ConfigExtension { } public static String getLocalSupervisorId(Map conf) { - return (String)conf.get(LOCAL_SUPERVISOR_ID); + return (String) conf.get(LOCAL_SUPERVISOR_ID); } public static void setLocalSupervisorId(Map conf, String supervisorId) { conf.put(LOCAL_SUPERVISOR_ID, supervisorId); } - - protected static String WORKER_CPU_CORE_UPPER_LIMIT = - "worker.cpu.core.upper.limit"; + + protected static String WORKER_CPU_CORE_UPPER_LIMIT = "worker.cpu.core.upper.limit"; public static Integer getWorkerCpuCoreUpperLimit(Map conf) { return JStormUtils.parseInt(conf.get(WORKER_CPU_CORE_UPPER_LIMIT), 1); } - public static void setWorkerCpuCoreUpperLimit(Map conf, - Integer cpuUpperLimit) { + public static void setWorkerCpuCoreUpperLimit(Map conf, Integer cpuUpperLimit) { conf.put(WORKER_CPU_CORE_UPPER_LIMIT, cpuUpperLimit); } + protected static String CLUSTER_NAME = "cluster.name"; public static String getClusterName(Map conf) { @@ -803,33 +775,68 @@ public class ConfigExtension { public static void setClusterName(Map conf, String clusterName) { conf.put(CLUSTER_NAME, clusterName); } - + + protected static final String NIMBUS_CACHE_CLASS = "nimbus.cache.class"; - + public static String getNimbusCacheClass(Map conf) { - return (String)conf.get(NIMBUS_CACHE_CLASS); + return (String) conf.get(NIMBUS_CACHE_CLASS); } - + /** * if this is set, nimbus cache db will be clean when start nimbus */ protected static final String NIMBUS_CACHE_RESET = "nimbus.cache.reset"; - + public static boolean getNimbusCacheReset(Map conf) { return JStormUtils.parseBoolean(conf.get(NIMBUS_CACHE_RESET), true); } - + + /** + * if this is set, nimbus metrics cache db will be clean when start nimbus + */ + protected static final String NIMBUS_METRIC_CACHE_RESET = "nimbus.metric.cache.reset"; + + public static boolean getMetricCacheReset(Map conf) { + return JStormUtils.parseBoolean(conf.get(NIMBUS_METRIC_CACHE_RESET), false); + } + + public static final double DEFAULT_METRIC_SAMPLE_RATE = 0.10d; + + public static final String TOPOLOGY_METRIC_SAMPLE_RATE = "topology.metric.sample.rate"; + + public static double getMetricSampleRate(Map conf) { + double sampleRate = JStormUtils.parseDouble(conf.get(TOPOLOGY_METRIC_SAMPLE_RATE), DEFAULT_METRIC_SAMPLE_RATE); + if (!conf.containsKey(TOPOLOGY_METRIC_SAMPLE_RATE)) { + conf.put(TOPOLOGY_METRIC_SAMPLE_RATE, sampleRate); + } + return sampleRate; + } + public static final String CACHE_TIMEOUT_LIST = "cache.timeout.list"; - + public static List<Integer> getCacheTimeoutList(Map conf) { - return (List<Integer>)conf.get(CACHE_TIMEOUT_LIST); + return (List<Integer>) conf.get(CACHE_TIMEOUT_LIST); } - + protected static final String NIMBUS_METRICS_THREAD_NUM = "nimbus.metrics.thread.num"; + public static int getNimbusMetricThreadNum(Map conf) { return JStormUtils.parseInt(conf.get(NIMBUS_METRICS_THREAD_NUM), 2); } + public static final String METRIC_UPLOADER_CLASS = "nimbus.metric.uploader.class"; + + public static String getMetricUploaderClass(Map<Object, Object> conf) { + return (String) conf.get(METRIC_UPLOADER_CLASS); + } + + public static final String METRIC_QUERY_CLIENT_CLASS = "nimbus.metric.query.client.class"; + + public static String getMetricQueryClientClass(Map<Object, Object> conf) { + return (String) conf.get(METRIC_QUERY_CLIENT_CLASS); + } + protected static String TASK_MSG_BATCH_SIZE = "task.msg.batch.size"; public static Integer getTaskMsgBatchSize(Map conf) { @@ -839,9 +846,9 @@ public class ConfigExtension { public static void setTaskMsgBatchSize(Map conf, Integer batchSize) { conf.put(TASK_MSG_BATCH_SIZE, batchSize); } - - protected static String TASK_BATCH_TUPLE = "task.batch.tuple"; - + + protected static String TASK_BATCH_TUPLE = "task.batch.tuple"; + public static Boolean isTaskBatchTuple(Map conf) { return JStormUtils.parseBoolean(conf.get(TASK_BATCH_TUPLE), false); } @@ -849,19 +856,87 @@ public class ConfigExtension { public static void setTaskBatchTuple(Map conf, boolean isBatchTuple) { conf.put(TASK_BATCH_TUPLE, isBatchTuple); } - - protected static String TOPOLOGY_ENABLE_NETTY_METRICS = "topology.enable.netty.metrics"; - public static void setTopologyNettyMetrics(Map conf, boolean enable) { - conf.put(TOPOLOGY_ENABLE_NETTY_METRICS, enable); + + protected static String TOPOLOGY_MAX_WORKER_NUM_FOR_NETTY_METRICS = "topology.max.worker.num.for.netty.metrics"; + + public static void setTopologyMaxWorkerNumForNettyMetrics(Map conf, int num) { + conf.put(TOPOLOGY_MAX_WORKER_NUM_FOR_NETTY_METRICS, num); } - - public static Boolean isEnableTopologyNettyMetrics(Map conf) { - return (Boolean)conf.get(TOPOLOGY_ENABLE_NETTY_METRICS); + + public static int getTopologyMaxWorkerNumForNettyMetrics(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_MAX_WORKER_NUM_FOR_NETTY_METRICS), 200); } - + protected static String UI_ONE_TABLE_PAGE_SIZE = "ui.one.table.page.size"; + public static long getUiOneTablePageSize(Map conf) { - return JStormUtils.parseLong(conf.get(UI_ONE_TABLE_PAGE_SIZE), 200); + return JStormUtils.parseLong(conf.get(UI_ONE_TABLE_PAGE_SIZE), 200); } - -} + + protected static String MAX_PENDING_METRIC_NUM = "topology.max.pending.metric.num"; + + public static int getMaxPendingMetricNum(Map conf) { + return JStormUtils.parseInt(conf.get(MAX_PENDING_METRIC_NUM), 200); + } + + protected static String TOPOLOGY_MASTER_SINGLE_WORKER = "topology.master.single.worker"; + + public static Boolean getTopologyMasterSingleWorker(Map conf) { + Boolean ret = JStormUtils.parseBoolean(conf.get(TOPOLOGY_MASTER_SINGLE_WORKER)); + return ret; + } + + public static String TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH = "topology.backpressure.water.mark.high"; + + public static double getBackpressureWaterMarkHigh(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH), 0.8); + } + + public static String TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW = "topology.backpressure.water.mark.low"; + + public static double getBackpressureWaterMarkLow(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW), 0.05); + } + + protected static String TOPOLOGY_BACKPRESSURE_CHECK_INTERVAL = "topology.backpressure.check.interval"; + + public static int getBackpressureCheckIntervl(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_BACKPRESSURE_CHECK_INTERVAL), 1000); + } + + protected static String TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_NUMBER = "topology.backpressure.trigger.sample.number"; + + public static int getBackpressureTriggerSampleNumber(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_NUMBER), 4); + } + + protected static String TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_RATE = "topology.backpressure.trigger.sample.rate"; + + public static double getBackpressureTriggerSampleRate(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_RATE), 0.75); + } + + public static String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable"; + + public static boolean isBackpressureEnable(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BACKPRESSURE_ENABLE), false); + } + + public static String TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO = "topology.backpressure.coordinator.trigger.ratio"; + + public static double getBackpressureCoordinatorRatio(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO), 0.1); + } + + protected static String SUPERVISOR_CHECK_WORKER_BY_SYSTEM_INFO = "supervisor.check.worker.by.system.info"; + + public static boolean isCheckWorkerAliveBySystemInfo(Map conf) { + return JStormUtils.parseBoolean(conf.get(SUPERVISOR_CHECK_WORKER_BY_SYSTEM_INFO), true); + } + + protected static String TOPOLOGY_TASK_HEARTBEAT_SEND_NUMBER = "topology.task.heartbeat.send.number"; + + public static int getTopologyTaskHbSendNumber(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_TASK_HEARTBEAT_SEND_NUMBER), 2000); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java b/jstorm-core/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java index c994858..545a5f4 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/client/WorkerAssignment.java @@ -33,15 +33,12 @@ import backtype.storm.utils.Utils; import com.alibaba.jstorm.utils.JStormUtils; -public class WorkerAssignment extends WorkerSlot implements Serializable, - JSONAware { - private static final Logger LOG = LoggerFactory - .getLogger(WorkerAssignment.class); +public class WorkerAssignment extends WorkerSlot implements Serializable, JSONAware { + private static final Logger LOG = LoggerFactory.getLogger(WorkerAssignment.class); private static final long serialVersionUID = -3483047434535537861L; - private Map<String, Integer> componentToNum = - new HashMap<String, Integer>(); + private Map<String, Integer> componentToNum = new HashMap<String, Integer>(); private long mem; @@ -165,9 +162,7 @@ public class WorkerAssignment extends WorkerSlot implements Serializable, String jvm = map.get(JVM_TAG); Long mem = JStormUtils.parseLong(map.get(MEM_TAG)); Integer cpu = JStormUtils.parseInt(map.get(CPU_TAG)); - Map<String, Object> componentToNum = - (Map<String, Object>) Utils.from_json(map - .get(COMPONENTTONUM_TAG)); + Map<String, Object> componentToNum = (Map<String, Object>) Utils.from_json(map.get(COMPONENTTONUM_TAG)); WorkerAssignment ret = new WorkerAssignment(supervisorId, port); @@ -185,8 +180,7 @@ public class WorkerAssignment extends WorkerSlot implements Serializable, } for (Entry<String, Object> entry : componentToNum.entrySet()) { - ret.addComponent(entry.getKey(), - JStormUtils.parseInt(entry.getValue())); + ret.addComponent(entry.getKey(), JStormUtils.parseInt(entry.getValue())); } return ret; } catch (Exception e) { @@ -202,22 +196,16 @@ public class WorkerAssignment extends WorkerSlot implements Serializable, @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = - prime - * result - + ((componentToNum == null) ? 0 : componentToNum - .hashCode()); + result = prime * result + ((componentToNum == null) ? 0 : componentToNum.hashCode()); result = prime * result + cpu; - result = - prime * result + ((hostName == null) ? 0 : hostName.hashCode()); + result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + ((jvm == null) ? 0 : jvm.hashCode()); result = prime * result + (int) (mem ^ (mem >>> 32)); return result; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/ConfigExtension.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/ConfigExtension.java b/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/ConfigExtension.java new file mode 100644 index 0000000..ab80c10 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/ConfigExtension.java @@ -0,0 +1,943 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.client.spout; + +import backtype.storm.Config; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.client.WorkerAssignment; +import com.alibaba.jstorm.utils.JStormUtils; +import org.apache.commons.lang.StringUtils; + +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ConfigExtension { + /** + * if this configure has been set, the spout or bolt will log all receive tuples + * <p/> + * topology.debug just for logging all sent tuples + */ + protected static final String TOPOLOGY_DEBUG_RECV_TUPLE = "topology.debug.recv.tuple"; + + public static void setTopologyDebugRecvTuple(Map conf, boolean debug) { + conf.put(TOPOLOGY_DEBUG_RECV_TUPLE, Boolean.valueOf(debug)); + } + + public static Boolean isTopologyDebugRecvTuple(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_DEBUG_RECV_TUPLE), false); + } + + private static final String TOPOLOGY_ENABLE_METRIC_DEBUG = "topology.enable.metric.debug"; + + public static boolean isEnableMetricDebug(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_METRIC_DEBUG), false); + } + + private static final String TOPOLOGY_DEBUG_METRIC_NAMES = "topology.debug.metric.names"; + + public static String getDebugMetricNames(Map conf) { + String metrics = (String) conf.get(TOPOLOGY_DEBUG_METRIC_NAMES); + if (metrics == null) { + return ""; + } + return metrics; + } + + /** + * metrics switch, ONLY for performance test, DO NOT set it to false in production + */ + private static final String TOPOLOGY_ENABLE_METRICS = "topology.enable.metrics"; + + public static boolean isEnableMetrics(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_METRICS), true); + } + + /** + * port number of deamon httpserver server + */ + private static final Integer DEFAULT_DEAMON_HTTPSERVER_PORT = 7621; + + protected static final String SUPERVISOR_DEAMON_HTTPSERVER_PORT = "supervisor.deamon.logview.port"; + + public static Integer getSupervisorDeamonHttpserverPort(Map conf) { + return JStormUtils.parseInt(conf.get(SUPERVISOR_DEAMON_HTTPSERVER_PORT), DEFAULT_DEAMON_HTTPSERVER_PORT + 1); + } + + protected static final String NIMBUS_DEAMON_HTTPSERVER_PORT = "nimbus.deamon.logview.port"; + + public static Integer getNimbusDeamonHttpserverPort(Map conf) { + return JStormUtils.parseInt(conf.get(NIMBUS_DEAMON_HTTPSERVER_PORT), DEFAULT_DEAMON_HTTPSERVER_PORT); + } + + /** + * Worker gc parameter + */ + protected static final String WORKER_GC_CHILDOPTS = "worker.gc.childopts"; + + public static void setWorkerGc(Map conf, String gc) { + conf.put(WORKER_GC_CHILDOPTS, gc); + } + + public static String getWorkerGc(Map conf) { + return (String) conf.get(WORKER_GC_CHILDOPTS); + } + + protected static final String WOREKER_REDIRECT_OUTPUT = "worker.redirect.output"; + + public static boolean getWorkerRedirectOutput(Map conf) { + Object result = conf.get(WOREKER_REDIRECT_OUTPUT); + if (result == null) + return true; + return (Boolean) result; + } + + protected static final String WOREKER_REDIRECT_OUTPUT_FILE = "worker.redirect.output.file"; + + public static void setWorkerRedirectOutputFile(Map conf, String outputPath) { + conf.put(WOREKER_REDIRECT_OUTPUT_FILE, outputPath); + } + + public static String getWorkerRedirectOutputFile(Map conf) { + return (String) conf.get(WOREKER_REDIRECT_OUTPUT_FILE); + } + + /** + * Usually, spout finish prepare before bolt, so spout need wait several seconds so that bolt finish preparation + * <p/> + * By default, the setting is 30 seconds + */ + protected static final String SPOUT_DELAY_RUN = "spout.delay.run"; + + public static void setSpoutDelayRunSeconds(Map conf, int delay) { + conf.put(SPOUT_DELAY_RUN, Integer.valueOf(delay)); + } + + public static int getSpoutDelayRunSeconds(Map conf) { + return JStormUtils.parseInt(conf.get(SPOUT_DELAY_RUN), 30); + } + + /** + * Default ZMQ Pending queue size + */ + public static final int DEFAULT_ZMQ_MAX_QUEUE_MSG = 1000; + + /** + * One task will alloc how many memory slot, the default setting is 1 + */ + protected static final String MEM_SLOTS_PER_TASK = "memory.slots.per.task"; + + @Deprecated + public static void setMemSlotPerTask(Map conf, int slotNum) { + if (slotNum < 1) { + throw new InvalidParameterException(); + } + conf.put(MEM_SLOTS_PER_TASK, Integer.valueOf(slotNum)); + } + + /** + * One task will use cpu slot number, the default setting is 1 + */ + protected static final String CPU_SLOTS_PER_TASK = "cpu.slots.per.task"; + + @Deprecated + public static void setCpuSlotsPerTask(Map conf, int slotNum) { + if (slotNum < 1) { + throw new InvalidParameterException(); + } + conf.put(CPU_SLOTS_PER_TASK, Integer.valueOf(slotNum)); + } + + /** + * if the setting has been set, the component's task must run different node This is conflict with USE_SINGLE_NODE + */ + protected static final String TASK_ON_DIFFERENT_NODE = "task.on.differ.node"; + + public static void setTaskOnDifferentNode(Map conf, boolean isIsolate) { + conf.put(TASK_ON_DIFFERENT_NODE, Boolean.valueOf(isIsolate)); + } + + public static boolean isTaskOnDifferentNode(Map conf) { + return JStormUtils.parseBoolean(conf.get(TASK_ON_DIFFERENT_NODE), false); + } + + protected static final String SUPERVISOR_ENABLE_CGROUP = "supervisor.enable.cgroup"; + + public static boolean isEnableCgroup(Map conf) { + return JStormUtils.parseBoolean(conf.get(SUPERVISOR_ENABLE_CGROUP), false); + } + + /** + * If component or topology configuration set "use.old.assignment", will try use old assignment firstly + */ + protected static final String USE_OLD_ASSIGNMENT = "use.old.assignment"; + + public static void setUseOldAssignment(Map conf, boolean useOld) { + conf.put(USE_OLD_ASSIGNMENT, Boolean.valueOf(useOld)); + } + + public static boolean isUseOldAssignment(Map conf) { + return JStormUtils.parseBoolean(conf.get(USE_OLD_ASSIGNMENT), false); + } + + /** + * The supervisor's hostname + */ + protected static final String SUPERVISOR_HOSTNAME = "supervisor.hostname"; + public static final Object SUPERVISOR_HOSTNAME_SCHEMA = String.class; + + public static String getSupervisorHost(Map conf) { + return (String) conf.get(SUPERVISOR_HOSTNAME); + } + + protected static final String SUPERVISOR_USE_IP = "supervisor.use.ip"; + + public static boolean isSupervisorUseIp(Map conf) { + return JStormUtils.parseBoolean(conf.get(SUPERVISOR_USE_IP), false); + } + + protected static final String NIMBUS_USE_IP = "nimbus.use.ip"; + + public static boolean isNimbusUseIp(Map conf) { + return JStormUtils.parseBoolean(conf.get(NIMBUS_USE_IP), false); + } + + protected static final String TOPOLOGY_ENABLE_CLASSLOADER = "topology.enable.classloader"; + + public static boolean isEnableTopologyClassLoader(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_ENABLE_CLASSLOADER), false); + } + + public static void setEnableTopologyClassLoader(Map conf, boolean enable) { + conf.put(TOPOLOGY_ENABLE_CLASSLOADER, Boolean.valueOf(enable)); + } + + protected static String CLASSLOADER_DEBUG = "classloader.debug"; + + public static boolean isEnableClassloaderDebug(Map conf) { + return JStormUtils.parseBoolean(conf.get(CLASSLOADER_DEBUG), false); + } + + public static void setEnableClassloaderDebug(Map conf, boolean enable) { + conf.put(CLASSLOADER_DEBUG, enable); + } + + protected static final String CONTAINER_NIMBUS_HEARTBEAT = "container.nimbus.heartbeat"; + + /** + * Get to know whether nimbus is run under Apsara/Yarn container + */ + public static boolean isEnableContainerNimbus() { + String path = System.getenv(CONTAINER_NIMBUS_HEARTBEAT); + + if (StringUtils.isBlank(path)) { + return false; + } else { + return true; + } + } + + /** + * Get Apsara/Yarn nimbus container's hearbeat dir + */ + public static String getContainerNimbusHearbeat() { + return System.getenv(CONTAINER_NIMBUS_HEARTBEAT); + } + + protected static final String CONTAINER_SUPERVISOR_HEARTBEAT = "container.supervisor.heartbeat"; + + /** + * Get to know whether supervisor is run under Apsara/Yarn supervisor container + */ + public static boolean isEnableContainerSupervisor() { + String path = System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT); + + if (StringUtils.isBlank(path)) { + return false; + } else { + return true; + } + } + + /** + * Get Apsara/Yarn supervisor container's hearbeat dir + */ + public static String getContainerSupervisorHearbeat() { + return (String) System.getenv(CONTAINER_SUPERVISOR_HEARTBEAT); + } + + protected static final String CONTAINER_HEARTBEAT_TIMEOUT_SECONDS = "container.heartbeat.timeout.seconds"; + + public static int getContainerHeartbeatTimeoutSeconds(Map conf) { + return JStormUtils.parseInt(conf.get(CONTAINER_HEARTBEAT_TIMEOUT_SECONDS), 240); + } + + protected static final String CONTAINER_HEARTBEAT_FREQUENCE = "container.heartbeat.frequence"; + + public static int getContainerHeartbeatFrequence(Map conf) { + return JStormUtils.parseInt(conf.get(CONTAINER_HEARTBEAT_FREQUENCE), 10); + } + + protected static final String JAVA_SANDBOX_ENABLE = "java.sandbox.enable"; + + public static boolean isJavaSandBoxEnable(Map conf) { + return JStormUtils.parseBoolean(conf.get(JAVA_SANDBOX_ENABLE), false); + } + + protected static String SPOUT_SINGLE_THREAD = "spout.single.thread"; + + public static boolean isSpoutSingleThread(Map conf) { + return JStormUtils.parseBoolean(conf.get(SPOUT_SINGLE_THREAD), false); + } + + public static void setSpoutSingleThread(Map conf, boolean enable) { + conf.put(SPOUT_SINGLE_THREAD, enable); + } + + protected static String WORKER_STOP_WITHOUT_SUPERVISOR = "worker.stop.without.supervisor"; + + public static boolean isWorkerStopWithoutSupervisor(Map conf) { + return JStormUtils.parseBoolean(conf.get(WORKER_STOP_WITHOUT_SUPERVISOR), false); + } + + protected static String CGROUP_ROOT_DIR = "supervisor.cgroup.rootdir"; + + public static String getCgroupRootDir(Map conf) { + return (String) conf.get(CGROUP_ROOT_DIR); + } + + protected static String NETTY_TRANSFER_ASYNC_AND_BATCH = "storm.messaging.netty.transfer.async.batch"; + + public static boolean isNettyTransferAsyncBatch(Map conf) { + return JStormUtils.parseBoolean(conf.get(NETTY_TRANSFER_ASYNC_AND_BATCH), true); + } + + protected static final String USE_USERDEFINE_ASSIGNMENT = "use.userdefine.assignment"; + + public static void setUserDefineAssignment(Map conf, List<WorkerAssignment> userDefines) { + List<String> ret = new ArrayList<String>(); + for (WorkerAssignment worker : userDefines) { + ret.add(Utils.to_json(worker)); + } + conf.put(USE_USERDEFINE_ASSIGNMENT, ret); + } + + public static List<WorkerAssignment> getUserDefineAssignment(Map conf) { + List<WorkerAssignment> ret = new ArrayList<WorkerAssignment>(); + if (conf.get(USE_USERDEFINE_ASSIGNMENT) == null) + return ret; + for (String worker : (List<String>) conf.get(USE_USERDEFINE_ASSIGNMENT)) { + ret.add(WorkerAssignment.parseFromObj(Utils.from_json(worker))); + } + return ret; + } + + protected static String NETTY_PENDING_BUFFER_TIMEOUT = "storm.messaging.netty.pending.buffer.timeout"; + + public static void setNettyPendingBufferTimeout(Map conf, Long timeout) { + conf.put(NETTY_PENDING_BUFFER_TIMEOUT, timeout); + } + + public static long getNettyPendingBufferTimeout(Map conf) { + int messageTimeout = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 120); + return JStormUtils.parseLong(conf.get(NETTY_PENDING_BUFFER_TIMEOUT), messageTimeout * 1000); + } + + protected static final String MEMSIZE_PER_WORKER = "worker.memory.size"; + + public static void setMemSizePerWorker(Map conf, long memSize) { + conf.put(MEMSIZE_PER_WORKER, memSize); + } + + public static void setMemSizePerWorkerByKB(Map conf, long memSize) { + long size = memSize * 1024l; + setMemSizePerWorker(conf, size); + } + + public static void setMemSizePerWorkerByMB(Map conf, long memSize) { + long size = memSize * 1024l; + setMemSizePerWorkerByKB(conf, size); + } + + public static void setMemSizePerWorkerByGB(Map conf, long memSize) { + long size = memSize * 1024l; + setMemSizePerWorkerByMB(conf, size); + } + + public static long getMemSizePerWorker(Map conf) { + long size = JStormUtils.parseLong(conf.get(MEMSIZE_PER_WORKER), JStormUtils.SIZE_1_G * 2); + return size > 0 ? size : JStormUtils.SIZE_1_G * 2; + } + + protected static final String MIN_MEMSIZE_PER_WORKER = "worker.memory.min.size"; + + public static void setMemMinSizePerWorker(Map conf, long memSize) { + conf.put(MIN_MEMSIZE_PER_WORKER, memSize); + } + + public static long getMemMinSizePerWorker(Map conf) { + long maxMemSize = getMemSizePerWorker(conf); + + Long size = JStormUtils.parseLong(conf.get(MIN_MEMSIZE_PER_WORKER)); + long minMemSize = (size == null || size == 0) ? maxMemSize : size; + + return minMemSize; + } + + protected static final String CPU_SLOT_PER_WORKER = "worker.cpu.slot.num"; + + public static void setCpuSlotNumPerWorker(Map conf, int slotNum) { + conf.put(CPU_SLOT_PER_WORKER, slotNum); + } + + public static int getCpuSlotPerWorker(Map conf) { + int slot = JStormUtils.parseInt(conf.get(CPU_SLOT_PER_WORKER), 1); + return slot > 0 ? slot : 1; + } + + protected static String TOPOLOGY_PERFORMANCE_METRICS = "topology.performance.metrics"; + + public static boolean isEnablePerformanceMetrics(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_PERFORMANCE_METRICS), true); + } + + public static void setPerformanceMetrics(Map conf, boolean isEnable) { + conf.put(TOPOLOGY_PERFORMANCE_METRICS, isEnable); + } + + protected static String NETTY_BUFFER_THRESHOLD_SIZE = "storm.messaging.netty.buffer.threshold"; + + public static long getNettyBufferThresholdSize(Map conf) { + return JStormUtils.parseLong(conf.get(NETTY_BUFFER_THRESHOLD_SIZE), 8 * JStormUtils.SIZE_1_M); + } + + public static void setNettyBufferThresholdSize(Map conf, long size) { + conf.put(NETTY_BUFFER_THRESHOLD_SIZE, size); + } + + protected static String NETTY_MAX_SEND_PENDING = "storm.messaging.netty.max.pending"; + + public static void setNettyMaxSendPending(Map conf, long pending) { + conf.put(NETTY_MAX_SEND_PENDING, pending); + } + + public static long getNettyMaxSendPending(Map conf) { + return JStormUtils.parseLong(conf.get(NETTY_MAX_SEND_PENDING), 16); + } + + protected static String DISRUPTOR_USE_SLEEP = "disruptor.use.sleep"; + + public static boolean isDisruptorUseSleep(Map conf) { + return JStormUtils.parseBoolean(conf.get(DISRUPTOR_USE_SLEEP), true); + } + + public static void setDisruptorUseSleep(Map conf, boolean useSleep) { + conf.put(DISRUPTOR_USE_SLEEP, useSleep); + } + + public static boolean isTopologyContainAcker(Map conf) { + int num = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), 1); + if (num > 0) { + return true; + } else { + return false; + } + } + + protected static String NETTY_SYNC_MODE = "storm.messaging.netty.sync.mode"; + + public static boolean isNettySyncMode(Map conf) { + return JStormUtils.parseBoolean(conf.get(NETTY_SYNC_MODE), false); + } + + public static void setNettySyncMode(Map conf, boolean sync) { + conf.put(NETTY_SYNC_MODE, sync); + } + + protected static String NETTY_ASYNC_BLOCK = "storm.messaging.netty.async.block"; + + public static boolean isNettyASyncBlock(Map conf) { + return JStormUtils.parseBoolean(conf.get(NETTY_ASYNC_BLOCK), true); + } + + public static void setNettyASyncBlock(Map conf, boolean block) { + conf.put(NETTY_ASYNC_BLOCK, block); + } + + protected static String ALIMONITOR_METRICS_POST = "topology.alimonitor.metrics.post"; + + public static boolean isAlimonitorMetricsPost(Map conf) { + return JStormUtils.parseBoolean(conf.get(ALIMONITOR_METRICS_POST), true); + } + + public static void setAlimonitorMetricsPost(Map conf, boolean post) { + conf.put(ALIMONITOR_METRICS_POST, post); + } + + public static String TASK_CLEANUP_TIMEOUT_SEC = "task.cleanup.timeout.sec"; + + public static int getTaskCleanupTimeoutSec(Map conf) { + return JStormUtils.parseInt(conf.get(TASK_CLEANUP_TIMEOUT_SEC), 10); + } + + public static void setTaskCleanupTimeoutSec(Map conf, int timeout) { + conf.put(TASK_CLEANUP_TIMEOUT_SEC, timeout); + } + + protected static String UI_CLUSTERS = "ui.clusters"; + protected static String UI_CLUSTER_NAME = "name"; + protected static String UI_CLUSTER_ZK_ROOT = "zkRoot"; + protected static String UI_CLUSTER_ZK_SERVERS = "zkServers"; + protected static String UI_CLUSTER_ZK_PORT = "zkPort"; + + public static List<Map> getUiClusters(Map conf) { + return (List<Map>) conf.get(UI_CLUSTERS); + } + + public static void setUiClusters(Map conf, List<Map> uiClusters) { + conf.put(UI_CLUSTERS, uiClusters); + } + + public static Map getUiClusterInfo(List<Map> uiClusters, String name) { + Map ret = null; + for (Map cluster : uiClusters) { + String clusterName = getUiClusterName(cluster); + if (clusterName.equals(name)) { + ret = cluster; + break; + } + } + + return ret; + } + + public static String getUiClusterName(Map uiCluster) { + return (String) uiCluster.get(UI_CLUSTER_NAME); + } + + public static String getUiClusterZkRoot(Map uiCluster) { + return (String) uiCluster.get(UI_CLUSTER_ZK_ROOT); + } + + public static List<String> getUiClusterZkServers(Map uiCluster) { + return (List<String>) uiCluster.get(UI_CLUSTER_ZK_SERVERS); + } + + public static Integer getUiClusterZkPort(Map uiCluster) { + return JStormUtils.parseInt(uiCluster.get(UI_CLUSTER_ZK_PORT)); + } + + + protected static String SPOUT_PEND_FULL_SLEEP = "spout.pending.full.sleep"; + + public static boolean isSpoutPendFullSleep(Map conf) { + return JStormUtils.parseBoolean(conf.get(SPOUT_PEND_FULL_SLEEP), false); + } + + public static void setSpoutPendFullSleep(Map conf, boolean sleep) { + conf.put(SPOUT_PEND_FULL_SLEEP, sleep); + + } + + protected static String LOGVIEW_ENCODING = "supervisor.deamon.logview.encoding"; + protected static String UTF8 = "utf-8"; + + public static String getLogViewEncoding(Map conf) { + String ret = (String) conf.get(LOGVIEW_ENCODING); + if (ret == null) + ret = UTF8; + return ret; + } + + public static void setLogViewEncoding(Map conf, String enc) { + conf.put(LOGVIEW_ENCODING, enc); + } + + protected static String LOG_PAGE_SIZE = "log.page.size"; + + public static int getLogPageSize(Map conf) { + return JStormUtils.parseInt(conf.get(LOG_PAGE_SIZE), 32 * 1024); + } + + public static void setLogPageSize(Map conf, int pageSize) { + conf.put(LOG_PAGE_SIZE, pageSize); + } + + public static String TASK_STATUS_ACTIVE = "Active"; + public static String TASK_STATUS_INACTIVE = "Inactive"; + public static String TASK_STATUS_STARTING = "Starting"; + + protected static String ALIMONITOR_TOPO_METIRC_NAME = "topology.alimonitor.topo.metrics.name"; + protected static String ALIMONITOR_TASK_METIRC_NAME = "topology.alimonitor.task.metrics.name"; + protected static String ALIMONITOR_WORKER_METIRC_NAME = "topology.alimonitor.worker.metrics.name"; + protected static String ALIMONITOR_USER_METIRC_NAME = "topology.alimonitor.user.metrics.name"; + + public static String getAlmonTopoMetricName(Map conf) { + return (String) conf.get(ALIMONITOR_TOPO_METIRC_NAME); + } + + public static String getAlmonTaskMetricName(Map conf) { + return (String) conf.get(ALIMONITOR_TASK_METIRC_NAME); + } + + public static String getAlmonWorkerMetricName(Map conf) { + return (String) conf.get(ALIMONITOR_WORKER_METIRC_NAME); + } + + public static String getAlmonUserMetricName(Map conf) { + return (String) conf.get(ALIMONITOR_USER_METIRC_NAME); + } + + protected static String SPOUT_PARALLELISM = "topology.spout.parallelism"; + protected static String BOLT_PARALLELISM = "topology.bolt.parallelism"; + + public static Integer getSpoutParallelism(Map conf, String componentName) { + Integer ret = null; + Map<String, String> map = (Map<String, String>) (conf.get(SPOUT_PARALLELISM)); + if (map != null) + ret = JStormUtils.parseInt(map.get(componentName)); + return ret; + } + + public static Integer getBoltParallelism(Map conf, String componentName) { + Integer ret = null; + Map<String, String> map = (Map<String, String>) (conf.get(BOLT_PARALLELISM)); + if (map != null) + ret = JStormUtils.parseInt(map.get(componentName)); + return ret; + } + + protected static String TOPOLOGY_BUFFER_SIZE_LIMITED = "topology.buffer.size.limited"; + + public static void setTopologyBufferSizeLimited(Map conf, boolean limited) { + conf.put(TOPOLOGY_BUFFER_SIZE_LIMITED, limited); + } + + public static boolean getTopologyBufferSizeLimited(Map conf) { + boolean isSynchronized = isNettySyncMode(conf); + if (isSynchronized == true) { + return true; + } + + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BUFFER_SIZE_LIMITED), true); + + } + + protected static String SUPERVISOR_SLOTS_PORTS_BASE = "supervisor.slots.ports.base"; + + public static int getSupervisorSlotsPortsBase(Map conf) { + return JStormUtils.parseInt(conf.get(SUPERVISOR_SLOTS_PORTS_BASE), 6800); + } + + // SUPERVISOR_SLOTS_PORTS_BASE don't provide setting function, it must be + // set by configuration + + protected static String SUPERVISOR_SLOTS_PORT_CPU_WEIGHT = "supervisor.slots.port.cpu.weight"; + + public static double getSupervisorSlotsPortCpuWeight(Map conf) { + Object value = conf.get(SUPERVISOR_SLOTS_PORT_CPU_WEIGHT); + Double ret = JStormUtils.convertToDouble(value); + if (ret == null || ret <= 0) { + return 1.2; + } else { + return ret; + } + } + + protected static String SUPERVISOR_SLOTS_PORT_MEM_WEIGHT = "supervisor.slots.port.mem.weight"; + + public static double getSupervisorSlotsPortMemWeight(Map conf) { + Object value = conf.get(SUPERVISOR_SLOTS_PORT_MEM_WEIGHT); + Double ret = JStormUtils.convertToDouble(value); + if (ret == null || ret <= 0) { + return 0.7; + } else { + return ret; + } + } + + // SUPERVISOR_SLOTS_PORT_CPU_WEIGHT don't provide setting function, it must + // be set by configuration + + protected static String USER_DEFINED_LOG4J_CONF = "user.defined.log4j.conf"; + + public static String getUserDefinedLog4jConf(Map conf) { + return (String) conf.get(USER_DEFINED_LOG4J_CONF); + } + + public static void setUserDefinedLog4jConf(Map conf, String fileName) { + conf.put(USER_DEFINED_LOG4J_CONF, fileName); + } + + protected static String USER_DEFINED_LOGBACK_CONF = "user.defined.logback.conf"; + + public static String getUserDefinedLogbackConf(Map conf) { + return (String) conf.get(USER_DEFINED_LOGBACK_CONF); + } + + public static void setUserDefinedLogbackConf(Map conf, String fileName) { + conf.put(USER_DEFINED_LOGBACK_CONF, fileName); + } + + protected static String TASK_ERROR_INFO_REPORT_INTERVAL = "topology.task.error.report.interval"; + + public static Integer getTaskErrorReportInterval(Map conf) { + return JStormUtils.parseInt(conf.get(TASK_ERROR_INFO_REPORT_INTERVAL), 60); + } + + public static void setTaskErrorReportInterval(Map conf, Integer interval) { + conf.put(TASK_ERROR_INFO_REPORT_INTERVAL, interval); + } + + protected static String DEFAULT_CACHE_TIMEOUT = "default.cache.timeout"; + + public static int getDefaultCacheTimeout(Map conf) { + return JStormUtils.parseInt(conf.get(DEFAULT_CACHE_TIMEOUT), 60); + } + + public static void setDefaultCacheTimeout(Map conf, int timeout) { + conf.put(DEFAULT_CACHE_TIMEOUT, timeout); + } + + protected static String WORKER_MERTRIC_REPORT_CHECK_FREQUENCY = "worker.metric.report.frequency.secs"; + + public static int getWorkerMetricReportCheckFrequency(Map conf) { + return JStormUtils.parseInt(conf.get(WORKER_MERTRIC_REPORT_CHECK_FREQUENCY), 60); + } + + public static void setWorkerMetricReportFrequency(Map conf, int frequence) { + conf.put(WORKER_MERTRIC_REPORT_CHECK_FREQUENCY, frequence); + } + + /** + * Store local worker port/workerId/supervisorId to configuration + */ + protected static String LOCAL_WORKER_PORT = "local.worker.port"; + protected static String LOCLA_WORKER_ID = "local.worker.id"; + protected static String LOCAL_SUPERVISOR_ID = "local.supervisor.id"; + + public static int getLocalWorkerPort(Map conf) { + return JStormUtils.parseInt(conf.get(LOCAL_WORKER_PORT)); + } + + public static void setLocalWorkerPort(Map conf, int port) { + conf.put(LOCAL_WORKER_PORT, port); + } + + public static String getLocalWorkerId(Map conf) { + return (String) conf.get(LOCLA_WORKER_ID); + } + + public static void setLocalWorkerId(Map conf, String workerId) { + conf.put(LOCLA_WORKER_ID, workerId); + } + + public static String getLocalSupervisorId(Map conf) { + return (String) conf.get(LOCAL_SUPERVISOR_ID); + } + + public static void setLocalSupervisorId(Map conf, String supervisorId) { + conf.put(LOCAL_SUPERVISOR_ID, supervisorId); + } + + protected static String WORKER_CPU_CORE_UPPER_LIMIT = "worker.cpu.core.upper.limit"; + + public static Integer getWorkerCpuCoreUpperLimit(Map conf) { + return JStormUtils.parseInt(conf.get(WORKER_CPU_CORE_UPPER_LIMIT), 1); + } + + public static void setWorkerCpuCoreUpperLimit(Map conf, Integer cpuUpperLimit) { + conf.put(WORKER_CPU_CORE_UPPER_LIMIT, cpuUpperLimit); + } + + + protected static String CLUSTER_NAME = "cluster.name"; + + public static String getClusterName(Map conf) { + return (String) conf.get(CLUSTER_NAME); + } + + public static void setClusterName(Map conf, String clusterName) { + conf.put(CLUSTER_NAME, clusterName); + } + + + protected static final String NIMBUS_CACHE_CLASS = "nimbus.cache.class"; + + public static String getNimbusCacheClass(Map conf) { + return (String) conf.get(NIMBUS_CACHE_CLASS); + } + + /** + * if this is set, nimbus cache db will be clean when start nimbus + */ + protected static final String NIMBUS_CACHE_RESET = "nimbus.cache.reset"; + + public static boolean getNimbusCacheReset(Map conf) { + return JStormUtils.parseBoolean(conf.get(NIMBUS_CACHE_RESET), true); + } + + /** + * if this is set, nimbus metrics cache db will be clean when start nimbus + */ + protected static final String NIMBUS_METRIC_CACHE_RESET = "nimbus.metric.cache.reset"; + + public static boolean getMetricCacheReset(Map conf) { + return JStormUtils.parseBoolean(conf.get(NIMBUS_METRIC_CACHE_RESET), false); + } + + public static final double DEFAULT_METRIC_SAMPLE_RATE = 0.10d; + + public static final String TOPOLOGY_METRIC_SAMPLE_RATE = "topology.metric.sample.rate"; + + public static double getMetricSampleRate(Map conf) { + double sampleRate = JStormUtils.parseDouble(conf.get(TOPOLOGY_METRIC_SAMPLE_RATE), DEFAULT_METRIC_SAMPLE_RATE); + if (!conf.containsKey(TOPOLOGY_METRIC_SAMPLE_RATE)) { + conf.put(TOPOLOGY_METRIC_SAMPLE_RATE, sampleRate); + } + return sampleRate; + } + + public static final String CACHE_TIMEOUT_LIST = "cache.timeout.list"; + + public static List<Integer> getCacheTimeoutList(Map conf) { + return (List<Integer>) conf.get(CACHE_TIMEOUT_LIST); + } + + protected static final String NIMBUS_METRICS_THREAD_NUM = "nimbus.metrics.thread.num"; + + public static int getNimbusMetricThreadNum(Map conf) { + return JStormUtils.parseInt(conf.get(NIMBUS_METRICS_THREAD_NUM), 2); + } + + public static final String METRIC_UPLOADER_CLASS = "nimbus.metric.uploader.class"; + + public static String getMetricUploaderClass(Map<Object, Object> conf) { + return (String) conf.get(METRIC_UPLOADER_CLASS); + } + + public static final String METRIC_QUERY_CLIENT_CLASS = "nimbus.metric.query.client.class"; + + public static String getMetricQueryClientClass(Map<Object, Object> conf) { + return (String) conf.get(METRIC_QUERY_CLIENT_CLASS); + } + + protected static String TASK_MSG_BATCH_SIZE = "task.msg.batch.size"; + + public static Integer getTaskMsgBatchSize(Map conf) { + return JStormUtils.parseInt(conf.get(TASK_MSG_BATCH_SIZE), 1); + } + + public static void setTaskMsgBatchSize(Map conf, Integer batchSize) { + conf.put(TASK_MSG_BATCH_SIZE, batchSize); + } + + protected static String TASK_BATCH_TUPLE = "task.batch.tuple"; + + public static Boolean isTaskBatchTuple(Map conf) { + return JStormUtils.parseBoolean(conf.get(TASK_BATCH_TUPLE), false); + } + + public static void setTaskBatchTuple(Map conf, boolean isBatchTuple) { + conf.put(TASK_BATCH_TUPLE, isBatchTuple); + } + + protected static String TOPOLOGY_MAX_WORKER_NUM_FOR_NETTY_METRICS = "topology.max.worker.num.for.netty.metrics"; + + public static void setTopologyMaxWorkerNumForNettyMetrics(Map conf, int num) { + conf.put(TOPOLOGY_MAX_WORKER_NUM_FOR_NETTY_METRICS, num); + } + + public static int getTopologyMaxWorkerNumForNettyMetrics(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_MAX_WORKER_NUM_FOR_NETTY_METRICS), 100); + } + + protected static String UI_ONE_TABLE_PAGE_SIZE = "ui.one.table.page.size"; + + public static long getUiOneTablePageSize(Map conf) { + return JStormUtils.parseLong(conf.get(UI_ONE_TABLE_PAGE_SIZE), 200); + } + + protected static String MAX_PENDING_METRIC_NUM = "topology.max.pending.metric.num"; + + public static int getMaxPendingMetricNum(Map conf) { + return JStormUtils.parseInt(conf.get(MAX_PENDING_METRIC_NUM), 200); + } + + protected static String TOPOLOGY_MASTER_SINGLE_WORKER = "topology.master.single.worker"; + + public static Boolean getTopologyMasterSingleWorker(Map conf) { + Boolean ret = JStormUtils.parseBoolean(conf.get(TOPOLOGY_MASTER_SINGLE_WORKER)); + return ret; + } + + public static String TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH = "topology.backpressure.water.mark.high"; + + public static double getBackpressureWaterMarkHigh(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_WATER_MARK_HIGH), 0.8); + } + + public static String TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW = "topology.backpressure.water.mark.low"; + + public static double getBackpressureWaterMarkLow(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_WATER_MARK_LOW), 0.05); + } + + protected static String TOPOLOGY_BACKPRESSURE_CHECK_INTERVAL = "topology.backpressure.check.interval"; + + public static int getBackpressureCheckIntervl(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_BACKPRESSURE_CHECK_INTERVAL), 1000); + } + + protected static String TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_NUMBER = "topology.backpressure.trigger.sample.number"; + + public static int getBackpressureTriggerSampleNumber(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_NUMBER), 4); + } + + protected static String TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_RATE = "topology.backpressure.trigger.sample.rate"; + + public static double getBackpressureTriggerSampleRate(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_TRIGGER_SAMPLE_RATE), 0.75); + } + + public static String TOPOLOGY_BACKPRESSURE_ENABLE = "topology.backpressure.enable"; + + public static boolean isBackpressureEnable(Map conf) { + return JStormUtils.parseBoolean(conf.get(TOPOLOGY_BACKPRESSURE_ENABLE), false); + } + + public static String TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO = "topology.backpressure.coordinator.trigger.ratio"; + + public static double getBackpressureCoordinatorRatio(Map conf) { + return JStormUtils.parseDouble(conf.get(TOPOLOGY_BACKPRESSURE_COORDINATOR_RATIO), 0.1); + } + + protected static String SUPERVISOR_CHECK_WORKER_BY_SYSTEM_INFO = "supervisor.check.worker.by.system.info"; + + public static boolean isCheckWorkerAliveBySystemInfo(Map conf) { + return JStormUtils.parseBoolean(conf.get(SUPERVISOR_CHECK_WORKER_BY_SYSTEM_INFO), true); + } + + protected static String TOPOLOGY_TASK_HEARTBEAT_SEND_NUMBER = "topology.task.heartbeat.send.number"; + + public static int getTopologyTaskHbSendNumber(Map conf) { + return JStormUtils.parseInt(conf.get(TOPOLOGY_TASK_HEARTBEAT_SEND_NUMBER), 2000); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java b/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java index df88ad8..01d9da4 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IAckValueSpout.java @@ -22,8 +22,7 @@ import java.util.List; /** * This interface will list emit values when tuple success * - * if spout implement this interface, spout won't call ISpout.ack() when tuple - * success + * if spout implement this interface, spout won't call ISpout.ack() when tuple success * * @author longda */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java b/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java index 9bebfa4..8d16aba 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java @@ -22,8 +22,7 @@ import java.util.List; /** * This interface will list emit values when tuple fails * - * if spout implement this interface, spout won't call ISpout.fail() when tuple - * fail + * if spout implement this interface, spout won't call ISpout.fail() when tuple fail * * @author longda */
