A basic alerting example
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/b1df294b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/b1df294b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/b1df294b Branch: refs/heads/helix-monitoring Commit: b1df294baa798f88898029d94f2fe8d165c91ebc Parents: 902e6fa Author: Kanak Biscuitwala <[email protected]> Authored: Fri Jan 17 18:02:27 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Fri Jan 17 18:02:27 2014 -0800 ---------------------------------------------------------------------- .../helix/monitoring/MonitoringEvent.java | 16 +- .../monitoring/RiemannMonitoringClient.java | 22 +- helix-monitor-server/pom.xml | 5 + .../src/main/resources/riemann.config | 9 +- .../monitoring/TestClientServerMonitoring.java | 222 +++++++++++++++++++ 5 files changed, 257 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java index 3735589..80006fb 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/MonitoringEvent.java @@ -40,6 +40,7 @@ public class MonitoringEvent { private ClusterId _clusterId; private ResourceId _resourceId; private PartitionId _partitionId; + private String _name; private String _host; private String _eventState; private String _description; @@ -56,9 +57,10 @@ public class MonitoringEvent { */ public MonitoringEvent() { _clusterId = null; - _host = null; _resourceId = null; _partitionId = null; + _name = null; + _host = null; _eventState = null; _description = null; _time = null; @@ -71,6 +73,16 @@ public class MonitoringEvent { } /** + * Give this event a name + * @param name the name + * @return MonitoringEvent + */ + public MonitoringEvent name(String name) { + _name = name; + return this; + } + + /** * Set the cluster this event corresponds to * @param clusterId the cluster id * @return MonitoringEvent @@ -257,7 +269,7 @@ public class MonitoringEvent { if (_partitionId == null) { _partitionId = PartitionId.from("%"); } - return String.format("%s|%s|%s", _clusterId, _resourceId, _partitionId); + return String.format("%s|%s|%s|%s", _clusterId, _resourceId, _partitionId, _name); } String eventState() { http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java ---------------------------------------------------------------------- diff --git a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java index 1948308..20b0825 100644 --- a/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java +++ b/helix-monitor-client/src/main/java/org/apache/helix/monitoring/RiemannMonitoringClient.java @@ -311,35 +311,35 @@ public class RiemannMonitoringClient implements MonitoringClient { private EventDSL convertEvent(AbstractRiemannClient c, MonitoringEvent helixEvent) { EventDSL event = c.event(); if (helixEvent.host() != null) { - event = event.host(helixEvent.host()); + event.host(helixEvent.host()); } if (helixEvent.service() != null) { - event = event.service(helixEvent.service()); + event.service(helixEvent.service()); } if (helixEvent.eventState() != null) { - event = event.state(helixEvent.eventState()); + event.state(helixEvent.eventState()); } if (helixEvent.description() != null) { - event = event.description(helixEvent.description()); + event.description(helixEvent.description()); } if (helixEvent.time() != null) { - event = event.time(helixEvent.time()); + event.time(helixEvent.time()); } if (helixEvent.ttl() != null) { - event = event.ttl(helixEvent.ttl()); + event.ttl(helixEvent.ttl()); } if (helixEvent.longMetric() != null) { - event = event.metric(helixEvent.longMetric()); + event.metric(helixEvent.longMetric()); } else if (helixEvent.floatMetric() != null) { - event = event.metric(helixEvent.floatMetric()); + event.metric(helixEvent.floatMetric()); } else if (helixEvent.doubleMetric() != null) { - event = event.metric(helixEvent.doubleMetric()); + event.metric(helixEvent.doubleMetric()); } if (!helixEvent.tags().isEmpty()) { - event = event.tags(helixEvent.tags()); + event.tags(helixEvent.tags()); } if (!helixEvent.attributes().isEmpty()) { - event = event.attributes(helixEvent.attributes()); + event.attributes.putAll(helixEvent.attributes()); } return event; } http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/pom.xml ---------------------------------------------------------------------- diff --git a/helix-monitor-server/pom.xml b/helix-monitor-server/pom.xml index a703e0e..d6fdf52 100644 --- a/helix-monitor-server/pom.xml +++ b/helix-monitor-server/pom.xml @@ -44,6 +44,11 @@ under the License. <artifactId>helix-core</artifactId> </dependency> <dependency> + <groupId>org.apache.helix</groupId> + <artifactId>helix-monitor-client</artifactId> + <version>0.7.1-incubating-SNAPSHOT</version> + </dependency> + <dependency> <groupId>riemann</groupId> <artifactId>riemann</artifactId> <version>0.2.4</version> http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/main/resources/riemann.config ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/main/resources/riemann.config b/helix-monitor-server/src/main/resources/riemann.config index 08c3bce..0f06dd0 100644 --- a/helix-monitor-server/src/main/resources/riemann.config +++ b/helix-monitor-server/src/main/resources/riemann.config @@ -20,13 +20,13 @@ (logging/init :file "/dev/null") -(tcp-server) +(tcp-server :host "0.0.0.0") (instrumentation {:interval 1}) -(udp-server) -(ws-server) -(repl-server) +(udp-server :host "0.0.0.0") +(ws-server :host "0.0.0.0") +(repl-server :host "0.0.0.0") (periodically-expire 1) @@ -34,3 +34,4 @@ (streams (expired prn) index)) + http://git-wip-us.apache.org/repos/asf/helix/blob/b1df294b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java ---------------------------------------------------------------------- diff --git a/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java new file mode 100644 index 0000000..8b7f839 --- /dev/null +++ b/helix-monitor-server/src/test/java/org/apache/helix/monitoring/TestClientServerMonitoring.java @@ -0,0 +1,222 @@ +package org.apache.helix.monitoring; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.net.InetAddress; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.api.id.ClusterId; +import org.apache.helix.api.id.ParticipantId; +import org.apache.helix.api.id.PartitionId; +import org.apache.helix.api.id.ResourceId; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Leader; +import org.apache.helix.model.MonitoringConfig; +import org.apache.helix.spectator.RoutingTableProvider; +import org.junit.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Maps; + +public class TestClientServerMonitoring extends ZkUnitTestBase { + @Test + public void testMonitoring() throws Exception { + final int NUM_PARTICIPANTS = 4; + final int NUM_PARTITIONS = 8; + final int NUM_REPLICAS = 2; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "MasterSlave", // pick a built-in state model + RebalanceMode.FULL_AUTO, // let Helix handle rebalancing + true); // do rebalance + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + participants[i] = + new MockParticipantManager(ZK_ADDR, clusterName, "localhost_" + (12918 + i)); + participants[i].syncStart(); + } + HelixDataAccessor accessor = participants[0].getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // set a custom monitoring config + MonitoringConfig monitoringConfig = new MonitoringConfig("sampleMonitoringConfig"); + monitoringConfig.setConfig(getMonitoringConfigString()); + accessor.setProperty(keyBuilder.monitoringConfig("sampleMonitoringConfig"), monitoringConfig); + + // start controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.registerMonitoringServer(new RiemannMonitoringServer(InetAddress.getLocalHost() + .getHostName())); + controller.syncStart(); + + // make sure the leader has registered and is showing the server port + Leader leader = accessor.getProperty(keyBuilder.controllerLeader()); + Assert.assertNotNull(leader); + Assert.assertNotEquals(leader.getMonitoringPort(), -1); + Assert.assertNotNull(leader.getMonitoringHost()); + + // run the spectator + spectate(clusterName, "TestDB0", NUM_PARTITIONS); + + // stop participants + for (MockParticipantManager participant : participants) { + participant.syncStop(); + } + + // stop controller + controller.syncStop(); + } + + private String getMonitoringConfigString() { + StringBuilder sb = + new StringBuilder() + .append("(defn parse-int\r\n") + .append( + " \"Convert a string to an integer\"\r\n [instr]\r\n (Integer/parseInt instr))\r\n\r\n") + .append("(defn parse-double\r\n \"Convert a string into a double\"\r\n [instr]\r\n") + .append(" (Double/parseDouble instr))\r\n\r\n(defn check-failure-rate\r\n") + .append( + " \"Check if the event should trigger an alarm based on failure rate\"\r\n [e]\r\n") + .append( + " (let [writeCount (parse-int (:writeCount e)) failedCount (parse-int (:failedCount e))]\r\n") + .append( + " (if (> writeCount 0)\r\n (let [ratio (double (/ failedCount writeCount))]\r\n") + .append(" (if (> ratio 0.1) ; Report if the failure count exceeds 10%\r\n") + .append( + " (prn (:host e) \"has an unacceptable failure rate of\" ratio))))))\r\n\r\n") + .append( + "(defn check-95th-latency\r\n \"Check if the 95th percentile latency is within expectations\"\r\n") + .append(" [e]\r\n (let [latency (parse-double (:latency95 e))]\r\n") + .append( + " (if (> latency 1.0) ; Report if the 95th percentile latency exceeds 1.0s\r\n") + .append( + " (prn (:host e) \"has an unacceptable 95th percentile latency of\" latency))))\r\n\r\n") + .append("(streams\r\n (where\r\n (service #\".*LatencyReport.*\")") + .append( + " ; Only process services containing LatencyReport\r\n check-failure-rate\r\n") + .append(" check-95th-latency))"); + return sb.toString(); + } + + private void spectate(final String clusterName, final String resourceName, final int numPartitions) + throws Exception { + final Random random = new Random(); + final ClusterId clusterId = ClusterId.from(clusterName); + final ResourceId resourceId = ResourceId.from(resourceName); + + // Connect to Helix + final HelixManager manager = + HelixManagerFactory.getZKHelixManager(clusterName, null, InstanceType.SPECTATOR, ZK_ADDR); + manager.connect(); + + // Attach a monitoring client to this connection + final MonitoringClient client = + new RiemannMonitoringClient(clusterId, manager.getHelixDataAccessor()); + client.connect(); + + // Start spectating + final RoutingTableProvider routingTableProvider = new RoutingTableProvider(); + manager.addExternalViewChangeListener(routingTableProvider); + + // Send some metrics + client.every(5, 0, TimeUnit.SECONDS, new Runnable() { + @Override + public void run() { + Map<ParticipantId, Integer> writeCounts = Maps.newHashMap(); + Map<ParticipantId, Integer> failedCounts = Maps.newHashMap(); + Map<ParticipantId, Double> latency95Map = Maps.newHashMap(); + for (int i = 0; i < numPartitions; i++) { + // Figure out who hosts what + PartitionId partitionId = PartitionId.from(resourceId, i + ""); + List<InstanceConfig> instances = + routingTableProvider.getInstances(resourceName, partitionId.stringify(), "MASTER"); + if (instances.size() < 1) { + continue; + } + + // Normally you would get these attributes by using a CallTracker + ParticipantId participantId = instances.get(0).getParticipantId(); + int writeCount = random.nextInt(1000) + 10; + if (!writeCounts.containsKey(participantId)) { + writeCounts.put(participantId, writeCount); + } else { + writeCounts.put(participantId, writeCounts.get(participantId) + writeCount); + } + int failedCount = i != 0 ? 0 : writeCount / 2; // bad write count from p0 master + if (!failedCounts.containsKey(participantId)) { + failedCounts.put(participantId, failedCount); + } else { + failedCounts.put(participantId, failedCounts.get(participantId) + failedCount); + } + double latency = (i != 1) ? 0.001 : 5.000; // bad 95th latency from p1 master + latency95Map.put(participantId, latency); + } + + // Send everything grouped by participant + for (ParticipantId participantId : writeCounts.keySet()) { + Map<String, String> attributes = Maps.newHashMap(); + attributes.put("writeCount", writeCounts.get(participantId) + ""); + attributes.put("failedCount", failedCounts.get(participantId) + ""); + attributes.put("latency95", latency95Map.get(participantId) + ""); + + // Send an event with a ttl long enough to span the send interval + MonitoringEvent e = + new MonitoringEvent().cluster(clusterId).resource(resourceId) + .participant(participantId).name("LatencyReport").attributes(attributes) + .eventState("update").ttl(10.0f); + client.send(e, false); + } + } + }); + Thread.sleep(60000); + client.disconnect(); + manager.disconnect(); + } + +}
