Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2828#discussion_r217175834
--- Diff:
storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java ---
@@ -1096,41 +1104,230 @@ public static String getWindowHint(String window) {
result.put("transferred", commonAggregateStats.get_transferred());
result.put("acked", commonAggregateStats.get_acked());
result.put("failed", commonAggregateStats.get_failed());
- result.put(
- "requestedMemOnHeap",
-
commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME)
- );
- result.put(
- "requestedMemOffHeap",
-
commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME));
- result.put(
- "requestedCpu" ,
-
commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME));
+ if (commonAggregateStats.is_set_resources_map()) {
+ result.put(
+ "requestedMemOnHeap",
+
commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME)
+ );
+ result.put(
+ "requestedMemOffHeap",
+
commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME));
+ result.put(
+ "requestedCpu",
+
commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME));
+ }
return result;
}
+ private static String getTruncatedErrorString(String errorString) {
+ return errorString.substring(0, Math.min(errorString.length(),
200));
+ }
+
private static Map<String, Object> getSpoutAggStatsMap(
- ComponentAggregateStats componentAggregateStats, String
spoutId) {
+ ComponentAggregateStats componentAggregateStats, String
window) {
+ Map<String, Object> result = new HashMap();
+ SpoutAggregateStats spoutAggregateStats =
componentAggregateStats.get_specific_stats().get_spout();
+ CommonAggregateStats commonStats =
componentAggregateStats.get_common_stats();
+ result.putAll(getCommonAggStatsMap(commonStats));
+ result.put("window", window);
+ result.put("windowPretty", getWindowHint(window));
+ result.put("emitted", commonStats.get_emitted());
+ result.put("transferred", commonStats.get_transferred());
+ result.put("acked", commonStats.get_acked());
+ result.put("failed", commonStats.get_failed());
+ result.put("completeLatency",
spoutAggregateStats.get_complete_latency_ms());
+
+
+ ErrorInfo lastError = componentAggregateStats.get_last_error();
+ result.put("lastError", Objects.isNull(lastError) ? "" :
getTruncatedErrorString(lastError.get_error()));
+ return result;
+ }
+
+ private static Map<String, Object> getBoltAggStatsMap(
+ ComponentAggregateStats componentAggregateStats, String
window) {
+ Map<String, Object> result = new HashMap();
+ CommonAggregateStats commonStats =
componentAggregateStats.get_common_stats();
+ result.putAll(getCommonAggStatsMap(commonStats));
+ result.put("window", window);
+ result.put("windowPretty", getWindowHint(window));
+ result.put("emitted", commonStats.get_emitted());
+ result.put("transferred", commonStats.get_transferred());
+ result.put("acked", commonStats.get_acked());
+ result.put("failed", commonStats.get_failed());
+ BoltAggregateStats boltAggregateStats =
componentAggregateStats.get_specific_stats().get_bolt();
+ result.put("executeLatency",
StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
+ result.put("executed", boltAggregateStats.get_executed());
+ result.put("processLatency",
StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
+ result.put("capacity",
StatsUtil.floatStr(boltAggregateStats.get_capacity()));
+ return result;
+ }
+
+ private static Long nullToZero(Long value) {
+ return Objects.isNull(value) ? value : 0;
+ }
+
+ private static Double nullToZero(Double value) {
+ return Objects.isNull(value) ? value : 0;
+ }
+
+ private static Map<String, Object> getBoltInputStats(GlobalStreamId
globalStreamId,
+
ComponentAggregateStats componentAggregateStats) {
+ Map<String, Object> result = new HashMap();
+ SpecificAggregateStats specificAggregateStats =
componentAggregateStats.get_specific_stats();
+ BoltAggregateStats boltAggregateStats =
specificAggregateStats.get_bolt();
+ CommonAggregateStats commonAggregateStats =
componentAggregateStats.get_common_stats();
+ String componentId = globalStreamId.get_componentId();
+ result.put("component", componentId);
+ result.put("encodedComponentId", URLEncoder.encode(componentId));
+ result.put("stream", globalStreamId.get_streamId());
+ result.put("executeLatency",
StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
+ result.put("processLatency",
StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
+ result.put("executed",
nullToZero(boltAggregateStats.get_executed()));
+ result.put("acked", nullToZero(commonAggregateStats.get_acked()));
+ result.put("failed",
nullToZero(commonAggregateStats.get_failed()));
+ return result;
+ }
+
+ private static Map<String, Object> getBoltOutputStats(String streamId,
+
ComponentAggregateStats componentAggregateStats) {
+ Map<String, Object> result = new HashMap();
+ result.put("stream", streamId);
+ CommonAggregateStats commonStats =
componentAggregateStats.get_common_stats();
+ result.put("emitted", nullToZero(commonStats.get_emitted()));
+ result.put("transferred",
nullToZero(commonStats.get_transferred()));
+ return result;
+ }
+
+ private static Map<String, Object> getSpoutOutputStats(String streamId,
+
ComponentAggregateStats componentAggregateStats) {
+ SpecificAggregateStats specificAggregateStats =
componentAggregateStats.get_specific_stats();
+ SpoutAggregateStats spoutAggregateStats =
specificAggregateStats.get_spout();
+ Map<String, Object> result = new HashMap();
+ result.put("stream", streamId);
+ CommonAggregateStats commonStats =
componentAggregateStats.get_common_stats();
+ result.put("emitted", nullToZero(commonStats.get_emitted()));
+ result.put("transferred",
nullToZero(commonStats.get_transferred()));
+ result.put("completeLatency",
StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms()));
+ result.put("acked", nullToZero(commonStats.get_acked()));
+ result.put("failed", nullToZero(commonStats.get_failed()));
+ return result;
+ }
+
+ private static Map<String, Object> getBoltExecutorStats(String
topologyId, Map<String, Object> config,
+
ExecutorAggregateStats executorAggregateStats) {
+ Map<String, Object> result = new HashMap();
+ ExecutorSummary executorSummary =
executorAggregateStats.get_exec_summary();
+ ExecutorInfo executorInfo = executorSummary.get_executor_info();
+ ComponentAggregateStats componentAggregateStats =
executorAggregateStats.get_stats();
+ SpecificAggregateStats specificAggregateStats =
componentAggregateStats.get_specific_stats();
+ BoltAggregateStats boltAggregateStats =
specificAggregateStats.get_bolt();
+ CommonAggregateStats commonAggregateStats =
componentAggregateStats.get_common_stats();
+ String executorId = prettyExecutorInfo(executorInfo);
+ result.put("id", executorId);
+ result.put("encodedId", URLEncoder.encode(executorId));
+ result.put("uptime",
prettyUptimeSec(executorSummary.get_uptime_secs()));
+ result.put("uptimeSeconds", executorSummary.get_uptime_secs());
+ String host = executorSummary.get_host();
+ result.put("host", host);
+ int port = executorSummary.get_port();
+ result.put("port", port);
+ result.put("emitted",
nullToZero(commonAggregateStats.get_emitted()));
+ result.put("transferred",
nullToZero(commonAggregateStats.get_transferred()));
+ result.put("capacity",
StatsUtil.floatStr(nullToZero(boltAggregateStats.get_capacity())));
+ result.put("executeLatency",
StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));
+ result.put("executed",
nullToZero(boltAggregateStats.get_executed()));
+ result.put("processLatency",
StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));
+ result.put("acked", nullToZero(commonAggregateStats.get_acked()));
+ result.put("failed",
nullToZero(commonAggregateStats.get_failed()));
+ result.put("workerLogLink", getWorkerLogLink(host, port, config,
topologyId));
+ return result;
+ }
+
+ private static Map<String, Object> getSpoutExecutorStats(String
topologyId, Map<String, Object> config,
+
ExecutorAggregateStats executorAggregateStats) {
+ Map<String, Object> result = new HashMap();
+ ExecutorSummary executorSummary =
executorAggregateStats.get_exec_summary();
+ ExecutorInfo executorInfo = executorSummary.get_executor_info();
+ ComponentAggregateStats componentAggregateStats =
executorAggregateStats.get_stats();
+ SpecificAggregateStats specificAggregateStats =
componentAggregateStats.get_specific_stats();
+ SpoutAggregateStats spoutAggregateStats =
specificAggregateStats.get_spout();
+ CommonAggregateStats commonAggregateStats =
componentAggregateStats.get_common_stats();
+ String executorId = prettyExecutorInfo(executorInfo);
+ result.put("id", executorId);
+ result.put("encodedId", URLEncoder.encode(executorId));
+ result.put("uptime",
prettyUptimeSec(executorSummary.get_uptime_secs()));
+ result.put("uptimeSeconds", executorSummary.get_uptime_secs());
+ String host = executorSummary.get_host();
+ result.put("host", host);
+ int port = executorSummary.get_port();
+ result.put("port", port);
+ result.put("emitted",
nullToZero(commonAggregateStats.get_emitted()));
+ result.put("transferred",
nullToZero(commonAggregateStats.get_transferred()));
+ result.put("completeLatency",
StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms()));
+ result.put("acked", nullToZero(commonAggregateStats.get_acked()));
+ result.put("failed",
nullToZero(commonAggregateStats.get_failed()));
+ result.put("workerLogLink", getWorkerLogLink(host, port, config,
topologyId));
+ return result;
+ }
+
+ private static Map<String, Object> getComponentErrorInfo(ErrorInfo
errorInfo, Map config,
+ String
topologyId) {
+ Map<String, Object> result = new HashMap();
+ result.put("time", 1000 * (long) errorInfo.get_error_time_secs());
+ String host = errorInfo.get_host();
+ result.put("errorHost", host);
+ int port = errorInfo.get_port();
+ result.put("errorPort", port);
+ result.put("errorWorkerLogLink", getWorkerLogLink(host, port,
config, topologyId));
+ result.put("errorLapsedSecs", System.currentTimeMillis() / 1000 -
errorInfo.get_error_time_secs());
+ result.put("error", errorInfo.get_error());
+ return result;
+ }
+
+ private static Map<String, Object> getComponentErrors(List<ErrorInfo>
errorInfoList,
+ String
topologyId, Map config) {
+ Map<String, Object> result = new HashMap();
+
errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs));
+ result.put("componentErrors", errorInfoList.stream().map(
+ e -> getComponentErrorInfo(e, config, topologyId)
+ ).collect(Collectors.toList()));
+ return result;
+ }
+
+ private static Map<String, Object> getTopologyErrors(List<ErrorInfo>
errorInfoList,
+ String
topologyId, Map config) {
+ Map<String, Object> result = new HashMap();
+
errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs));
+ result.put("topologyErrors", errorInfoList.stream().map(
+ e -> getComponentErrorInfo(e, config, topologyId)
--- End diff --
1302: 'lambda arguments' has incorrect indentation level 16, expected level
should be 12.
---