Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2828#discussion_r217175834 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java --- @@ -1096,41 +1104,230 @@ public static String getWindowHint(String window) { result.put("transferred", commonAggregateStats.get_transferred()); result.put("acked", commonAggregateStats.get_acked()); result.put("failed", commonAggregateStats.get_failed()); - result.put( - "requestedMemOnHeap", - commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) - ); - result.put( - "requestedMemOffHeap", - commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME)); - result.put( - "requestedCpu" , - commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME)); + if (commonAggregateStats.is_set_resources_map()) { + result.put( + "requestedMemOnHeap", + commonAggregateStats.get_resources_map().get(Constants.COMMON_ONHEAP_MEMORY_RESOURCE_NAME) + ); + result.put( + "requestedMemOffHeap", + commonAggregateStats.get_resources_map().get(Constants.COMMON_OFFHEAP_MEMORY_RESOURCE_NAME)); + result.put( + "requestedCpu", + commonAggregateStats.get_resources_map().get(Constants.COMMON_CPU_RESOURCE_NAME)); + } return result; } + private static String getTruncatedErrorString(String errorString) { + return errorString.substring(0, Math.min(errorString.length(), 200)); + } + private static Map<String, Object> getSpoutAggStatsMap( - ComponentAggregateStats componentAggregateStats, String spoutId) { + ComponentAggregateStats componentAggregateStats, String window) { + Map<String, Object> result = new HashMap(); + SpoutAggregateStats spoutAggregateStats = componentAggregateStats.get_specific_stats().get_spout(); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.putAll(getCommonAggStatsMap(commonStats)); + result.put("window", window); + result.put("windowPretty", getWindowHint(window)); + result.put("emitted", commonStats.get_emitted()); + result.put("transferred", commonStats.get_transferred()); + result.put("acked", commonStats.get_acked()); + result.put("failed", commonStats.get_failed()); + result.put("completeLatency", spoutAggregateStats.get_complete_latency_ms()); + + + ErrorInfo lastError = componentAggregateStats.get_last_error(); + result.put("lastError", Objects.isNull(lastError) ? "" : getTruncatedErrorString(lastError.get_error())); + return result; + } + + private static Map<String, Object> getBoltAggStatsMap( + ComponentAggregateStats componentAggregateStats, String window) { + Map<String, Object> result = new HashMap(); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.putAll(getCommonAggStatsMap(commonStats)); + result.put("window", window); + result.put("windowPretty", getWindowHint(window)); + result.put("emitted", commonStats.get_emitted()); + result.put("transferred", commonStats.get_transferred()); + result.put("acked", commonStats.get_acked()); + result.put("failed", commonStats.get_failed()); + BoltAggregateStats boltAggregateStats = componentAggregateStats.get_specific_stats().get_bolt(); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("executed", boltAggregateStats.get_executed()); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + result.put("capacity", StatsUtil.floatStr(boltAggregateStats.get_capacity())); + return result; + } + + private static Long nullToZero(Long value) { + return Objects.isNull(value) ? value : 0; + } + + private static Double nullToZero(Double value) { + return Objects.isNull(value) ? value : 0; + } + + private static Map<String, Object> getBoltInputStats(GlobalStreamId globalStreamId, + ComponentAggregateStats componentAggregateStats) { + Map<String, Object> result = new HashMap(); + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + BoltAggregateStats boltAggregateStats = specificAggregateStats.get_bolt(); + CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats(); + String componentId = globalStreamId.get_componentId(); + result.put("component", componentId); + result.put("encodedComponentId", URLEncoder.encode(componentId)); + result.put("stream", globalStreamId.get_streamId()); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + result.put("executed", nullToZero(boltAggregateStats.get_executed())); + result.put("acked", nullToZero(commonAggregateStats.get_acked())); + result.put("failed", nullToZero(commonAggregateStats.get_failed())); + return result; + } + + private static Map<String, Object> getBoltOutputStats(String streamId, + ComponentAggregateStats componentAggregateStats) { + Map<String, Object> result = new HashMap(); + result.put("stream", streamId); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.put("emitted", nullToZero(commonStats.get_emitted())); + result.put("transferred", nullToZero(commonStats.get_transferred())); + return result; + } + + private static Map<String, Object> getSpoutOutputStats(String streamId, + ComponentAggregateStats componentAggregateStats) { + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + SpoutAggregateStats spoutAggregateStats = specificAggregateStats.get_spout(); + Map<String, Object> result = new HashMap(); + result.put("stream", streamId); + CommonAggregateStats commonStats = componentAggregateStats.get_common_stats(); + result.put("emitted", nullToZero(commonStats.get_emitted())); + result.put("transferred", nullToZero(commonStats.get_transferred())); + result.put("completeLatency", StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms())); + result.put("acked", nullToZero(commonStats.get_acked())); + result.put("failed", nullToZero(commonStats.get_failed())); + return result; + } + + private static Map<String, Object> getBoltExecutorStats(String topologyId, Map<String, Object> config, + ExecutorAggregateStats executorAggregateStats) { + Map<String, Object> result = new HashMap(); + ExecutorSummary executorSummary = executorAggregateStats.get_exec_summary(); + ExecutorInfo executorInfo = executorSummary.get_executor_info(); + ComponentAggregateStats componentAggregateStats = executorAggregateStats.get_stats(); + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + BoltAggregateStats boltAggregateStats = specificAggregateStats.get_bolt(); + CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats(); + String executorId = prettyExecutorInfo(executorInfo); + result.put("id", executorId); + result.put("encodedId", URLEncoder.encode(executorId)); + result.put("uptime", prettyUptimeSec(executorSummary.get_uptime_secs())); + result.put("uptimeSeconds", executorSummary.get_uptime_secs()); + String host = executorSummary.get_host(); + result.put("host", host); + int port = executorSummary.get_port(); + result.put("port", port); + result.put("emitted", nullToZero(commonAggregateStats.get_emitted())); + result.put("transferred", nullToZero(commonAggregateStats.get_transferred())); + result.put("capacity", StatsUtil.floatStr(nullToZero(boltAggregateStats.get_capacity()))); + result.put("executeLatency", StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms())); + result.put("executed", nullToZero(boltAggregateStats.get_executed())); + result.put("processLatency", StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms())); + result.put("acked", nullToZero(commonAggregateStats.get_acked())); + result.put("failed", nullToZero(commonAggregateStats.get_failed())); + result.put("workerLogLink", getWorkerLogLink(host, port, config, topologyId)); + return result; + } + + private static Map<String, Object> getSpoutExecutorStats(String topologyId, Map<String, Object> config, + ExecutorAggregateStats executorAggregateStats) { + Map<String, Object> result = new HashMap(); + ExecutorSummary executorSummary = executorAggregateStats.get_exec_summary(); + ExecutorInfo executorInfo = executorSummary.get_executor_info(); + ComponentAggregateStats componentAggregateStats = executorAggregateStats.get_stats(); + SpecificAggregateStats specificAggregateStats = componentAggregateStats.get_specific_stats(); + SpoutAggregateStats spoutAggregateStats = specificAggregateStats.get_spout(); + CommonAggregateStats commonAggregateStats = componentAggregateStats.get_common_stats(); + String executorId = prettyExecutorInfo(executorInfo); + result.put("id", executorId); + result.put("encodedId", URLEncoder.encode(executorId)); + result.put("uptime", prettyUptimeSec(executorSummary.get_uptime_secs())); + result.put("uptimeSeconds", executorSummary.get_uptime_secs()); + String host = executorSummary.get_host(); + result.put("host", host); + int port = executorSummary.get_port(); + result.put("port", port); + result.put("emitted", nullToZero(commonAggregateStats.get_emitted())); + result.put("transferred", nullToZero(commonAggregateStats.get_transferred())); + result.put("completeLatency", StatsUtil.floatStr(spoutAggregateStats.get_complete_latency_ms())); + result.put("acked", nullToZero(commonAggregateStats.get_acked())); + result.put("failed", nullToZero(commonAggregateStats.get_failed())); + result.put("workerLogLink", getWorkerLogLink(host, port, config, topologyId)); + return result; + } + + private static Map<String, Object> getComponentErrorInfo(ErrorInfo errorInfo, Map config, + String topologyId) { + Map<String, Object> result = new HashMap(); + result.put("time", 1000 * (long) errorInfo.get_error_time_secs()); + String host = errorInfo.get_host(); + result.put("errorHost", host); + int port = errorInfo.get_port(); + result.put("errorPort", port); + result.put("errorWorkerLogLink", getWorkerLogLink(host, port, config, topologyId)); + result.put("errorLapsedSecs", System.currentTimeMillis() / 1000 - errorInfo.get_error_time_secs()); + result.put("error", errorInfo.get_error()); + return result; + } + + private static Map<String, Object> getComponentErrors(List<ErrorInfo> errorInfoList, + String topologyId, Map config) { + Map<String, Object> result = new HashMap(); + errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs)); + result.put("componentErrors", errorInfoList.stream().map( + e -> getComponentErrorInfo(e, config, topologyId) + ).collect(Collectors.toList())); + return result; + } + + private static Map<String, Object> getTopologyErrors(List<ErrorInfo> errorInfoList, + String topologyId, Map config) { + Map<String, Object> result = new HashMap(); + errorInfoList.sort(Comparator.comparingInt(ErrorInfo::get_error_time_secs)); + result.put("topologyErrors", errorInfoList.stream().map( + e -> getComponentErrorInfo(e, config, topologyId) --- End diff -- 1302: 'lambda arguments' has incorrect indentation level 16, expected level should be 12.
---