Github user kishorvpatil commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218086663
--- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java
---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.stats;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.apache.storm.utils.Time;
+
+/**
+ * Stats calculations needed by storm client code.
+ */
+public class ClientStatsUtil {
+ public static final String SPOUT = "spout";
+ public static final String BOLT = "bolt";
+ static final String EXECUTOR_STATS = "executor-stats";
+ static final String UPTIME = "uptime";
+ public static final String TIME_SECS = "time-secs";
+ public static final ToGlobalStreamIdTransformer TO_GSID = new
ToGlobalStreamIdTransformer();
+ public static final IdentityTransformer IDENTITY = new
IdentityTransformer();
+
+ /**
+ * Convert a List<Long> executor to java List<Integer>.
+ */
+ public static List<Integer> convertExecutor(List<Long> executor) {
+ return Lists.newArrayList(executor.get(0).intValue(),
executor.get(1).intValue());
+ }
+
+ /**
+ * Make and map of executors to empty stats.
+ * @param executors the executors as keys of the map.
+ * @return and empty map of executors to stats.
+ */
+ public static Map<List<Integer>, ExecutorStats>
mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Object executor : executors) {
+ List startEnd = (List) executor;
+ ret.put(convertExecutor(startEnd), null);
+ }
+ return ret;
+ }
+
+ /**
+ * Convert Long Executor Ids in ZkHbs to Integer ones structure to
java maps.
+ */
+ public static Map<List<Integer>, ExecutorStats>
convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
+ Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+ for (Map.Entry<List<Long>, ExecutorStats> entry :
executorBeats.entrySet()) {
+ ret.put(convertExecutor(entry.getKey()), entry.getValue());
+ }
+ return ret;
+ }
+
+ /**
+ * Create a new worker heartbeat for zookeeper.
+ * @param topoId the topology id
+ * @param executorStats the stats for the executors
+ * @param uptime the uptime for the worker.
+ * @return the heartbeat map.
+ */
+ public static Map<String, Object> mkZkWorkerHb(String topoId,
Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
+ Map<String, Object> ret = new HashMap<>();
+ ret.put("storm-id", topoId);
+ ret.put(EXECUTOR_STATS, executorStats);
+ ret.put(UPTIME, uptime);
+ ret.put(TIME_SECS, Time.currentTimeSecs());
+
+ return ret;
+ }
+
+ private static Number getByKeyOr0(Map<String, Object> m, String k) {
+ if (m == null) {
+ return 0;
+ }
+
+ Number n = (Number) m.get(k);
+ if (n == null) {
+ return 0;
+ }
+ return n;
+ }
+
+ /**
+ * Get a sub-map by a given key.
+ * @param map the original map
+ * @param key the key to get it from.
+ * @return the map stored under key.
+ */
+ public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
+ if (map == null) {
+ return null;
+ }
+ return (Map<K, V>) map.get(key);
+ }
+
+ public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String,
Object> heartbeat) {
+ ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
+ ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
+ ret.set_storm_id((String) heartbeat.get("storm-id"));
+ ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
+
+ Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
+
+ Map<List<Integer>, ExecutorStats> executorStats =
getMapByKey(heartbeat, EXECUTOR_STATS);
+ if (executorStats != null) {
+ for (Map.Entry<List<Integer>, ExecutorStats> entry :
executorStats.entrySet()) {
+ List<Integer> executor = entry.getKey();
+ ExecutorStats stats = entry.getValue();
+ if (null != stats) {
+ convertedStats.put(new ExecutorInfo(executor.get(0),
executor.get(1)), stats);
+ }
+ }
+ }
+ ret.set_executor_stats(convertedStats);
+
+ return ret;
+ }
+
+ /**
+ * Converts stats to be over given windows of time.
+ * @param stats the stats
+ * @param secKeyFunc transform the sub-key
+ * @param firstKeyFunc transform the main key
+ */
+ public static <K1, K2> Map windowSetConverter(
+ Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1>
firstKeyFunc) {
+ Map ret = new HashMap();
+
+ for (Object o : stats.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
+ K1 key1 = firstKeyFunc.transform(entry.getKey());
+
+ Map subRetMap = (Map) ret.get(key1);
+ if (subRetMap == null) {
+ subRetMap = new HashMap();
+ }
+ ret.put(key1, subRetMap);
+
+ Map value = (Map) entry.getValue();
+ for (Object oo : value.entrySet()) {
+ Map.Entry subEntry = (Map.Entry) oo;
+ K2 key2 = secKeyFunc.transform(subEntry.getKey());
+ subRetMap.put(key2, subEntry.getValue());
+ }
+ }
+ return ret;
+ }
+
+ //
=====================================================================================
+ // key transformers
+ //
=====================================================================================
+
+ /**
+ * Provides a way to transform one key into another.
+ * @param <T>
+ */
+ interface KeyTransformer<T> {
+ T transform(Object key);
+ }
+
+ static class ToGlobalStreamIdTransformer implements
KeyTransformer<GlobalStreamId> {
+ @Override
+ public GlobalStreamId transform(Object key) {
+ if (key instanceof List) {
+ List l = (List) key;
+ if (l.size() > 1) {
+ return new GlobalStreamId((String) l.get(0), (String)
l.get(1));
+ }
+ }
+ return new GlobalStreamId("", key.toString());
--- End diff --
Should this be marked to _DEFAULT_STREAM_ID_ or is this expected to be
ignored?
---