Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218086663
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java 
---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.stats;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import org.apache.storm.generated.ClusterWorkerHeartbeat;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorStats;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.shade.com.google.common.collect.Lists;
    +import org.apache.storm.utils.Time;
    +
    +/**
    + * Stats calculations needed by storm client code.
    + */
    +public class ClientStatsUtil {
    +    public static final String SPOUT = "spout";
    +    public static final String BOLT = "bolt";
    +    static final String EXECUTOR_STATS = "executor-stats";
    +    static final String UPTIME = "uptime";
    +    public static final String TIME_SECS = "time-secs";
    +    public static final ToGlobalStreamIdTransformer TO_GSID = new 
ToGlobalStreamIdTransformer();
    +    public static final IdentityTransformer IDENTITY = new 
IdentityTransformer();
    +
    +    /**
    +     * Convert a List<Long> executor to java List<Integer>.
    +     */
    +    public static List<Integer> convertExecutor(List<Long> executor) {
    +        return Lists.newArrayList(executor.get(0).intValue(), 
executor.get(1).intValue());
    +    }
    +
    +    /**
    +     * Make and map of executors to empty stats.
    +     * @param executors the executors as keys of the map.
    +     * @return and empty map of executors to stats.
    +     */
    +    public static Map<List<Integer>, ExecutorStats> 
mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
    +        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
    +        for (Object executor : executors) {
    +            List startEnd = (List) executor;
    +            ret.put(convertExecutor(startEnd), null);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Convert Long Executor Ids in ZkHbs to Integer ones structure to 
java maps.
    +     */
    +    public static Map<List<Integer>, ExecutorStats> 
convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
    +        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
    +        for (Map.Entry<List<Long>, ExecutorStats> entry : 
executorBeats.entrySet()) {
    +            ret.put(convertExecutor(entry.getKey()), entry.getValue());
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Create a new worker heartbeat for zookeeper.
    +     * @param topoId the topology id
    +     * @param executorStats the stats for the executors
    +     * @param uptime the uptime for the worker.
    +     * @return the heartbeat map.
    +     */
    +    public static Map<String, Object> mkZkWorkerHb(String topoId, 
Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
    +        Map<String, Object> ret = new HashMap<>();
    +        ret.put("storm-id", topoId);
    +        ret.put(EXECUTOR_STATS, executorStats);
    +        ret.put(UPTIME, uptime);
    +        ret.put(TIME_SECS, Time.currentTimeSecs());
    +
    +        return ret;
    +    }
    +
    +    private static Number getByKeyOr0(Map<String, Object> m, String k) {
    +        if (m == null) {
    +            return 0;
    +        }
    +
    +        Number n = (Number) m.get(k);
    +        if (n == null) {
    +            return 0;
    +        }
    +        return n;
    +    }
    +
    +    /**
    +     * Get a sub-map by a given key.
    +     * @param map the original map
    +     * @param key the key to get it from.
    +     * @return the map stored under key.
    +     */
    +    public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
    +        if (map == null) {
    +            return null;
    +        }
    +        return (Map<K, V>) map.get(key);
    +    }
    +
    +    public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, 
Object> heartbeat) {
    +        ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
    +        ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
    +        ret.set_storm_id((String) heartbeat.get("storm-id"));
    +        ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
    +
    +        Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
    +
    +        Map<List<Integer>, ExecutorStats> executorStats = 
getMapByKey(heartbeat, EXECUTOR_STATS);
    +        if (executorStats != null) {
    +            for (Map.Entry<List<Integer>, ExecutorStats> entry : 
executorStats.entrySet()) {
    +                List<Integer> executor = entry.getKey();
    +                ExecutorStats stats = entry.getValue();
    +                if (null != stats) {
    +                    convertedStats.put(new ExecutorInfo(executor.get(0), 
executor.get(1)), stats);
    +                }
    +            }
    +        }
    +        ret.set_executor_stats(convertedStats);
    +
    +        return ret;
    +    }
    +
    +    /**
    +     * Converts stats to be over given windows of time.
    +     * @param stats the stats
    +     * @param secKeyFunc transform the sub-key
    +     * @param firstKeyFunc transform the main key
    +     */
    +    public static <K1, K2> Map windowSetConverter(
    +        Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> 
firstKeyFunc) {
    +        Map ret = new HashMap();
    +
    +        for (Object o : stats.entrySet()) {
    +            Map.Entry entry = (Map.Entry) o;
    +            K1 key1 = firstKeyFunc.transform(entry.getKey());
    +
    +            Map subRetMap = (Map) ret.get(key1);
    +            if (subRetMap == null) {
    +                subRetMap = new HashMap();
    +            }
    +            ret.put(key1, subRetMap);
    +
    +            Map value = (Map) entry.getValue();
    +            for (Object oo : value.entrySet()) {
    +                Map.Entry subEntry = (Map.Entry) oo;
    +                K2 key2 = secKeyFunc.transform(subEntry.getKey());
    +                subRetMap.put(key2, subEntry.getValue());
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    // 
=====================================================================================
    +    // key transformers
    +    // 
=====================================================================================
    +
    +    /**
    +     * Provides a way to transform one key into another.
    +     * @param <T>
    +     */
    +    interface KeyTransformer<T> {
    +        T transform(Object key);
    +    }
    +
    +    static class ToGlobalStreamIdTransformer implements 
KeyTransformer<GlobalStreamId> {
    +        @Override
    +        public GlobalStreamId transform(Object key) {
    +            if (key instanceof List) {
    +                List l = (List) key;
    +                if (l.size() > 1) {
    +                    return new GlobalStreamId((String) l.get(0), (String) 
l.get(1));
    +                }
    +            }
    +            return new GlobalStreamId("", key.toString());
    --- End diff --
    
    Should this be marked to _DEFAULT_STREAM_ID_ or is this expected to be 
ignored?


---

Reply via email to