[ https://issues.apache.org/jira/browse/STORM-1611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15186756#comment-15186756 ]
ASF GitHub Bot commented on STORM-1611: --------------------------------------- Github user abhishekagarwal87 commented on a diff in the pull request: https://github.com/apache/storm/pull/1195#discussion_r55485829 --- Diff: storm-core/src/jvm/org/apache/storm/pacemaker/Pacemaker.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.pacemaker; + +import org.apache.storm.generated.*; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J; + +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class Pacemaker implements IServerMessageHandler { + + private static final Logger LOG = LoggerFactory.getLogger(Pacemaker.class); + + private Map heartbeats; + private PacemakerStats pacemakerStats; + private Map conf; + private final long sleepSeconds = 60; + + private static class PacemakerStats { + public AtomicInteger sendPulseCount = new AtomicInteger(); + public AtomicInteger totalReceivedSize = new AtomicInteger(); + public AtomicInteger getPulseCount = new AtomicInteger(); + public AtomicInteger totalSentSize = new AtomicInteger(); + public AtomicInteger largestHeartbeatSize = new AtomicInteger(); + public AtomicInteger averageHeartbeatSize = new AtomicInteger(); + } + + public Pacemaker(Map conf) { + heartbeats = new ConcurrentHashMap(); + pacemakerStats = new PacemakerStats(); + this.conf = conf; + startStatsThread(); + } + + @Override + public HBMessage handleMessage(HBMessage m, boolean authenticated) { + HBMessage response = null; + HBMessageData data = m.get_data(); + switch (m.get_type()) { + case CREATE_PATH: + response = createPath(data.get_path()); + break; + case EXISTS: + response = exists(data.get_path(), authenticated); + break; + case SEND_PULSE: + response = sendPulse(data.get_pulse()); + break; + case GET_ALL_PULSE_FOR_PATH: + response = getAllPulseForPath(data.get_path(), authenticated); + break; + case GET_ALL_NODES_FOR_PATH: + response = getAllNodesForPath(data.get_path(), authenticated); + break; + case GET_PULSE: + response = getPulse(data.get_path(), authenticated); + break; + case DELETE_PATH: + response = deletePath(data.get_path()); + break; + case DELETE_PULSE_ID: + response = deletePulseId(data.get_path()); + break; + default: + LOG.info("Got Unexpected Type: {}", m.get_type()); + break; + } + if (response != null) + response.set_message_id(m.get_message_id()); + return response; + } + + private HBMessage createPath(String path) { + return new HBMessage(HBServerMessageType.CREATE_PATH_RESPONSE, null); + } + + private HBMessage exists(String path, boolean authenticated) { + HBMessage response = null; + if (authenticated) { + boolean itDoes = heartbeats.containsKey(path); + LOG.debug("Checking if path [ {} ] exists... {} .", path, itDoes); + response = new HBMessage(HBServerMessageType.EXISTS_RESPONSE, HBMessageData.boolval(itDoes)); + } else { + response = notAuthorized(); + } + return response; + } + + private HBMessage notAuthorized() { + return new HBMessage(HBServerMessageType.NOT_AUTHORIZED, null); + } + + private HBMessage sendPulse(HBPulse pulse) { + String id = pulse.get_id(); + byte[] details = pulse.get_details(); + LOG.debug("Saving Pulse for id [ {} ] data [ {} ].", id, details); + pacemakerStats.sendPulseCount.incrementAndGet(); + pacemakerStats.totalReceivedSize.addAndGet(details.length); + updateLargestHbSize(details.length); + updateAverageHbSize(details.length); + heartbeats.put(id, details); + return new HBMessage(HBServerMessageType.SEND_PULSE_RESPONSE, null); + } + + private HBMessage getAllPulseForPath(String path, boolean authenticated) { + if (authenticated) { + return new HBMessage(HBServerMessageType.GET_ALL_PULSE_FOR_PATH_RESPONSE, null); + } else { + return notAuthorized(); + } + } + + private HBMessage getAllNodesForPath(String path, boolean authenticated) { + LOG.debug("List all nodes for path {}", path); + if (authenticated) { + Set<String> pulseIds = new HashSet<>(); + for (Object key : heartbeats.keySet()) { + String k = (String) key; + String[] replaceStr = k.replaceFirst(path, "").split("/"); + String trimmmed = null; + for (String str : replaceStr) { + if (!str.equals("")) { + trimmmed = str; + break; + } + } + if (trimmmed != null && k.indexOf(path) == 0) { + pulseIds.add(trimmmed); + } + } + HBMessageData hbMessageData = HBMessageData.nodes(new HBNodes(new ArrayList(pulseIds))); + return new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE, hbMessageData); + } else { + return notAuthorized(); + } + } + + private HBMessage getPulse(String path, boolean authenticated) { + if (authenticated) { + byte[] details = (byte[]) heartbeats.get(path); + LOG.debug("Getting Pulse for path [ {} ]...data [ {} ].", path, details); + pacemakerStats.getPulseCount.incrementAndGet(); + if (details != null) { + pacemakerStats.totalSentSize.addAndGet(details.length); + } + HBPulse hbPulse = new HBPulse(); + hbPulse.set_id(path); + hbPulse.set_details(details); + return new HBMessage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse)); + } else { + return notAuthorized(); + } + } + + private HBMessage deletePath(String path) { + String prefix = path.endsWith("/") ? path : (path + "/"); + for (Object key : heartbeats.keySet()) { + if (((String) key).indexOf(prefix) == 0) + deletePulseId((String) key); + } + return new HBMessage(HBServerMessageType.DELETE_PATH_RESPONSE, null); + } + + private HBMessage deletePulseId(String path) { + LOG.debug("Deleting Pulse for id [ {} ].", path); + heartbeats.remove(path); + return new HBMessage(HBServerMessageType.DELETE_PULSE_ID_RESPONSE, null); + } + + private void updateLargestHbSize(int size) { + int newValue = size; + while (true) { + int oldValue = pacemakerStats.largestHeartbeatSize.get(); + if (newValue > oldValue) { + if (!pacemakerStats.largestHeartbeatSize.compareAndSet(oldValue, newValue)) + continue; + } + break; + } + } + + private void updateAverageHbSize(int size) { + int newValue = size; + while (true) { + int oldValue = pacemakerStats.averageHeartbeatSize.get(); + int count = pacemakerStats.sendPulseCount.get(); + newValue = ((count * oldValue) + newValue) / (count + 1); --- End diff -- "value to add" should remain the same. Only the "value to set" should change. > port org.apache.storm.pacemaker.pacemaker to java > ------------------------------------------------- > > Key: STORM-1611 > URL: https://issues.apache.org/jira/browse/STORM-1611 > Project: Apache Storm > Issue Type: New Feature > Reporter: John Fang > Assignee: John Fang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)