http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java new file mode 100644 index 0000000..709db27 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.common.metric.AsmMetric; + +import java.io.Serializable; +import java.util.Map; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public interface AsmMetricSet extends Serializable { + Map<String, AsmMetric> getMetrics(); +}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java new file mode 100644 index 0000000..ecb69d6 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java @@ -0,0 +1,41 @@ +package com.alibaba.jstorm.metric; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class AsmWindow { + public static final Integer M1_WINDOW = 60; + public static final Integer M10_WINDOW = 600; + public static final Integer H2_WINDOW = 7200; + public static final Integer D1_WINDOW = 86400; + + public static final String M1_WINDOW_STR = "0d0h1m0s"; + public static final String M10_WINDOW_STR = "0d0h10m0s"; + public static final String H2_WINDOW_STR = "0d2h0m0s"; + public static final String D1_WINDOW_STR = "1d0h0m0s"; + + public static final Set<Integer> TIME_WINDOWS = new TreeSet<Integer>(); + private static final Map<Integer, String> WIN_TO_STR = new HashMap<Integer, String>(); + + static { + TIME_WINDOWS.add(M1_WINDOW); + TIME_WINDOWS.add(M10_WINDOW); + TIME_WINDOWS.add(H2_WINDOW); + TIME_WINDOWS.add(D1_WINDOW); + + WIN_TO_STR.put(M1_WINDOW, M1_WINDOW_STR); + WIN_TO_STR.put(M10_WINDOW, M10_WINDOW_STR); + WIN_TO_STR.put(H2_WINDOW, H2_WINDOW_STR); + WIN_TO_STR.put(D1_WINDOW, D1_WINDOW_STR); + } + + public static String win2str(Integer win) { + return WIN_TO_STR.get(win); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java new file mode 100644 index 0000000..290f813 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java @@ -0,0 +1,842 @@ +package com.alibaba.jstorm.metric; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; + +public class Bytes { + + private static final Logger LOG = LoggerFactory.getLogger(Bytes.class); + + /** + * Size of boolean in bytes + */ + public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE; + + /** + * Size of byte in bytes + */ + public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN; + + /** + * Size of char in bytes + */ + public static final int SIZEOF_CHAR = Character.SIZE / Byte.SIZE; + + /** + * Size of double in bytes + */ + public static final int SIZEOF_DOUBLE = Double.SIZE / Byte.SIZE; + + /** + * Size of float in bytes + */ + public static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE; + + /** + * Size of int in bytes + */ + public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE; + + /** + * Size of long in bytes + */ + public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE; + + /** + * Size of short in bytes + */ + public static final int SIZEOF_SHORT = Short.SIZE / Byte.SIZE; + + + /** + * Estimate of size cost to pay beyond payload in jvm for instance of byte []. + * Estimate based on study of jhat and jprofiler numbers. + */ + // JHat says BU is 56 bytes. + // SizeOf which uses java.lang.instrument says 24 bytes. (3 longs?) + public static final int ESTIMATED_HEAP_TAX = 16; + + + /** + * Put bytes at the specified byte array position. + * + * @param tgtBytes the byte array + * @param tgtOffset position in the array + * @param srcBytes array to write out + * @param srcOffset source offset + * @param srcLength source length + * @return incremented offset + */ + public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes, + int srcOffset, int srcLength) { + System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength); + return tgtOffset + srcLength; + } + + /** + * Write a single byte out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @param b byte to write out + * @return incremented offset + */ + public static int putByte(byte[] bytes, int offset, byte b) { + bytes[offset] = b; + return offset + 1; + } + + /** + * Returns a new byte array, copied from the passed ByteBuffer. + * + * @param bb A ByteBuffer + * @return the byte array + */ + public static byte[] toBytes(ByteBuffer bb) { + int length = bb.limit(); + byte[] result = new byte[length]; + System.arraycopy(bb.array(), bb.arrayOffset(), result, 0, length); + return result; + } + + public static byte[] copyBytes(final byte[] bytes, int offset, int length) { + if (offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, length); + } + byte[] result = new byte[length]; + System.arraycopy(bytes, offset, result, 0, length); + return result; + } + + /** + * Write a printable representation of a byte array. + * + * @param b byte array + * @return string + * @see #toStringBinary(byte[], int, int) + */ + public static String toStringBinary(final byte[] b) { + if (b == null) + return "null"; + return toStringBinary(b, 0, b.length); + } + + /** + * Converts the given byte buffer, from its array offset to its limit, to + * a string. The position and the mark are ignored. + * + * @param buf a byte buffer + * @return a string representation of the buffer's binary contents + */ + public static String toStringBinary(ByteBuffer buf) { + if (buf == null) + return "null"; + return toStringBinary(buf.array(), buf.arrayOffset(), buf.limit()); + } + + /** + * Write a printable representation of a byte array. Non-printable + * characters are hex escaped in the format \\x%02X, eg: + * \x00 \x05 etc + * + * @param b array to write out + * @param off offset to start at + * @param len length to write + * @return string output + */ + public static String toStringBinary(final byte[] b, int off, int len) { + StringBuilder result = new StringBuilder(); + try { + String first = new String(b, off, len, "ISO-8859-1"); + for (int i = 0; i < first.length(); ++i) { + int ch = first.charAt(i) & 0xFF; + if ((ch >= '0' && ch <= '9') + || (ch >= 'A' && ch <= 'Z') + || (ch >= 'a' && ch <= 'z') + || " `~!@#$%^&*()-_=+[]{}\\|;:'\",.<>/?".indexOf(ch) >= 0) { + result.append(first.charAt(i)); + } else { + result.append(String.format("\\x%02X", ch)); + } + } + } catch (UnsupportedEncodingException e) { + LOG.error("ISO-8859-1 not supported?", e); + } + return result.toString(); + } + + private static boolean isHexDigit(char c) { + return + (c >= 'A' && c <= 'F') || + (c >= '0' && c <= '9'); + } + + /** + * Takes a ASCII digit in the range A-F0-9 and returns + * the corresponding integer/ordinal value. + * + * @param ch The hex digit. + * @return The converted hex value as a byte. + */ + public static byte toBinaryFromHex(byte ch) { + if (ch >= 'A' && ch <= 'F') + return (byte) ((byte) 10 + (byte) (ch - 'A')); + // else + return (byte) (ch - '0'); + } + + public static byte[] toBytesBinary(String in) { + // this may be bigger than we need, but lets be safe. + byte[] b = new byte[in.length()]; + int size = 0; + for (int i = 0; i < in.length(); ++i) { + char ch = in.charAt(i); + if (ch == '\\' && in.length() > i + 1 && in.charAt(i + 1) == 'x') { + // ok, take next 2 hex digits. + char hd1 = in.charAt(i + 2); + char hd2 = in.charAt(i + 3); + + // they need to be A-F0-9: + if (!isHexDigit(hd1) || + !isHexDigit(hd2)) { + // bogus escape code, ignore: + continue; + } + // turn hex ASCII digit -> number + byte d = (byte) ((toBinaryFromHex((byte) hd1) << 4) + toBinaryFromHex((byte) hd2)); + + b[size++] = d; + i += 3; // skip 3 + } else { + b[size++] = (byte) ch; + } + } + // resize: + byte[] b2 = new byte[size]; + System.arraycopy(b, 0, b2, 0, size); + return b2; + } + + /** + * Convert a boolean to a byte array. True becomes -1 + * and false becomes 0. + * + * @param b value + * @return <code>b</code> encoded in a byte array. + */ + public static byte[] toBytes(final boolean b) { + return new byte[]{b ? (byte) -1 : (byte) 0}; + } + + /** + * Reverses {@link #toBytes(boolean)} + * + * @param b array + * @return True or false. + */ + public static boolean toBoolean(final byte[] b) { + if (b.length != 1) { + throw new IllegalArgumentException("Array has wrong size: " + b.length); + } + return b[0] != (byte) 0; + } + + public static boolean toBoolean(final byte[] bytes, int offset, int length) { + if (length != SIZEOF_BOOLEAN || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_BOOLEAN); + } + return bytes[offset] != (byte) 0; + } + + /** + * Convert a long value to a byte array using big-endian. + * + * @param val value to convert + * @return the byte array + */ + public static byte[] toBytes(long val) { + byte[] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + /** + * Converts a byte array to a long value. Reverses + * {@link #toBytes(long)} + * + * @param bytes array + * @return the long value + */ + public static long toLong(byte[] bytes) { + return toLong(bytes, 0, SIZEOF_LONG); + } + + /** + * Converts a byte array to a long value. Assumes there will be + * {@link #SIZEOF_LONG} bytes available. + * + * @param bytes bytes + * @param offset offset + * @return the long value + */ + public static long toLong(byte[] bytes, int offset) { + return toLong(bytes, offset, SIZEOF_LONG); + } + + /** + * Converts a byte array to a long value. + * + * @param bytes array of bytes + * @param offset offset into array + * @param length length of data (must be {@link #SIZEOF_LONG}) + * @return the long value + * @throws IllegalArgumentException if length is not {@link #SIZEOF_LONG} or + * if there's not enough room in the array at the offset indicated. + */ + public static long toLong(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_LONG || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG); + } + long l = 0; + for (int i = offset; i < offset + length; i++) { + l <<= 8; + l ^= bytes[i] & 0xFF; + } + return l; + } + + private static IllegalArgumentException + explainWrongLengthOrOffset(final byte[] bytes, + final int offset, + final int length, + final int expectedLength) { + String reason; + if (length != expectedLength) { + reason = "Wrong length: " + length + ", expected " + expectedLength; + } else { + reason = "offset (" + offset + ") + length (" + length + ") exceed the" + + " capacity of the array: " + bytes.length; + } + return new IllegalArgumentException(reason); + } + + /** + * Put a long value out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @param val long to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putLong(byte[] bytes, int offset, long val) { + if (bytes.length - offset < SIZEOF_LONG) { + throw new IllegalArgumentException("Not enough room to put a long at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + for (int i = offset + 7; i > offset; i--) { + bytes[i] = (byte) val; + val >>>= 8; + } + bytes[offset] = (byte) val; + return offset + SIZEOF_LONG; + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format" + * + * @param bytes byte array + * @return Float made from passed byte array. + */ + public static float toFloat(byte[] bytes) { + return toFloat(bytes, 0); + } + + /** + * Presumes float encoded as IEEE 754 floating-point "single format" + * + * @param bytes array to convert + * @param offset offset into array + * @return Float made from passed byte array. + */ + public static float toFloat(byte[] bytes, int offset) { + return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT)); + } + + /** + * @param bytes byte array + * @param offset offset to write to + * @param f float value + * @return New offset in <code>bytes</code> + */ + public static int putFloat(byte[] bytes, int offset, float f) { + return putInt(bytes, offset, Float.floatToRawIntBits(f)); + } + + /** + * @param f float value + * @return the float represented as byte [] + */ + public static byte[] toBytes(final float f) { + // Encode it as int + return Bytes.toBytes(Float.floatToRawIntBits(f)); + } + + /** + * @param bytes byte array + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte[] bytes) { + return toDouble(bytes, 0); + } + + /** + * @param bytes byte array + * @param offset offset where double is + * @return Return double made from passed bytes. + */ + public static double toDouble(final byte[] bytes, final int offset) { + return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG)); + } + + /** + * @param bytes byte array + * @param offset offset to write to + * @param d value + * @return New offset into array <code>bytes</code> + */ + public static int putDouble(byte[] bytes, int offset, double d) { + return putLong(bytes, offset, Double.doubleToLongBits(d)); + } + + /** + * Serialize a double as the IEEE 754 double format output. The resultant + * array will be 8 bytes long. + * + * @param d value + * @return the double represented as byte [] + */ + public static byte[] toBytes(final double d) { + // Encode it as a long + return Bytes.toBytes(Double.doubleToRawLongBits(d)); + } + + /** + * Convert an int value to a byte array + * + * @param val value + * @return the byte array + */ + public static byte[] toBytes(int val) { + byte[] b = new byte[4]; + for (int i = 3; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + /** + * Converts a byte array to an int value + * + * @param bytes byte array + * @return the int value + */ + public static int toInt(byte[] bytes) { + return toInt(bytes, 0, SIZEOF_INT); + } + + /** + * Converts a byte array to an int value + * + * @param bytes byte array + * @param offset offset into array + * @return the int value + */ + public static int toInt(byte[] bytes, int offset) { + return toInt(bytes, offset, SIZEOF_INT); + } + + /** + * Converts a byte array to an int value + * + * @param bytes byte array + * @param offset offset into array + * @param length length of int (has to be {@link #SIZEOF_INT}) + * @return the int value + * @throws IllegalArgumentException if length is not {@link #SIZEOF_INT} or + * if there's not enough room in the array at the offset indicated. + */ + public static int toInt(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_INT || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT); + } + int n = 0; + for (int i = offset; i < (offset + length); i++) { + n <<= 8; + n ^= bytes[i] & 0xFF; + } + return n; + } + + /** + * Put an int value out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @param val int to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putInt(byte[] bytes, int offset, int val) { + if (bytes.length - offset < SIZEOF_INT) { + throw new IllegalArgumentException("Not enough room to put an int at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + for (int i = offset + 3; i > offset; i--) { + bytes[i] = (byte) val; + val >>>= 8; + } + bytes[offset] = (byte) val; + return offset + SIZEOF_INT; + } + + /** + * Convert a short value to a byte array of {@link #SIZEOF_SHORT} bytes long. + * + * @param val value + * @return the byte array + */ + public static byte[] toBytes(short val) { + byte[] b = new byte[SIZEOF_SHORT]; + b[1] = (byte) val; + val >>= 8; + b[0] = (byte) val; + return b; + } + + /** + * Converts a byte array to a short value + * + * @param bytes byte array + * @return the short value + */ + public static short toShort(byte[] bytes) { + return toShort(bytes, 0, SIZEOF_SHORT); + } + + /** + * Converts a byte array to a short value + * + * @param bytes byte array + * @param offset offset into array + * @return the short value + */ + public static short toShort(byte[] bytes, int offset) { + return toShort(bytes, offset, SIZEOF_SHORT); + } + + /** + * Converts a byte array to a short value + * + * @param bytes byte array + * @param offset offset into array + * @param length length, has to be {@link #SIZEOF_SHORT} + * @return the short value + * @throws IllegalArgumentException if length is not {@link #SIZEOF_SHORT} + * or if there's not enough room in the array at the offset indicated. + */ + public static short toShort(byte[] bytes, int offset, final int length) { + if (length != SIZEOF_SHORT || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT); + } + short n = 0; + n ^= bytes[offset] & 0xFF; + n <<= 8; + n ^= bytes[offset + 1] & 0xFF; + return n; + } + + /** + * This method will get a sequence of bytes from pos -> limit, + * but will restore pos after. + * + * @param buf + * @return byte array + */ + public static byte[] getBytes(ByteBuffer buf) { + int savedPos = buf.position(); + byte[] newBytes = new byte[buf.remaining()]; + buf.get(newBytes); + buf.position(savedPos); + return newBytes; + } + + /** + * Put a short value out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @param val short to write out + * @return incremented offset + * @throws IllegalArgumentException if the byte array given doesn't have + * enough room at the offset specified. + */ + public static int putShort(byte[] bytes, int offset, short val) { + if (bytes.length - offset < SIZEOF_SHORT) { + throw new IllegalArgumentException("Not enough room to put a short at" + + " offset " + offset + " in a " + bytes.length + " byte array"); + } + bytes[offset + 1] = (byte) val; + val >>= 8; + bytes[offset] = (byte) val; + return offset + SIZEOF_SHORT; + } + + public static byte toByte(byte[] bytes, int offset, int length) { + if (length != SIZEOF_BYTE || offset + length > bytes.length) { + throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_BYTE); + } + return bytes[offset]; + } + + + /** + * Convert a BigDecimal value to a byte array + * + * @param val + * @return the byte array + */ + public static byte[] toBytes(BigDecimal val) { + byte[] valueBytes = val.unscaledValue().toByteArray(); + byte[] result = new byte[valueBytes.length + SIZEOF_INT]; + int offset = putInt(result, 0, val.scale()); + putBytes(result, offset, valueBytes, 0, valueBytes.length); + return result; + } + + + /** + * Converts a byte array to a BigDecimal + * + * @param bytes + * @return the char value + */ + public static BigDecimal toBigDecimal(byte[] bytes) { + return toBigDecimal(bytes, 0, bytes.length); + } + + /** + * Converts a byte array to a BigDecimal value + * + * @param bytes + * @param offset + * @param length + * @return the char value + */ + public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) { + if (bytes == null || length < SIZEOF_INT + 1 || + (offset + length > bytes.length)) { + return null; + } + + int scale = toInt(bytes, offset); + byte[] tcBytes = new byte[length - SIZEOF_INT]; + System.arraycopy(bytes, offset + SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT); + return new BigDecimal(new BigInteger(tcBytes), scale); + } + + /** + * Put a BigDecimal value out to the specified byte array position. + * + * @param bytes the byte array + * @param offset position in the array + * @param val BigDecimal to write out + * @return incremented offset + */ + public static int putBigDecimal(byte[] bytes, int offset, BigDecimal val) { + if (bytes == null) { + return offset; + } + + byte[] valueBytes = val.unscaledValue().toByteArray(); + byte[] result = new byte[valueBytes.length + SIZEOF_INT]; + offset = putInt(result, offset, val.scale()); + return putBytes(result, offset, valueBytes, 0, valueBytes.length); + } + + /** + * @param left left operand + * @param right right operand + * @return 0 if equal, < 0 if left is less than right, etc. + */ + public static int compareTo(final byte[] left, final byte[] right) { + return compareByteArrayInLexOrder(left, 0, left.length, right, 0, right.length); + } + + /** + * Lexicographically compare two arrays. + * + * @param buffer1 left operand + * @param buffer2 right operand + * @param offset1 Where to start comparing in the left buffer + * @param offset2 Where to start comparing in the right buffer + * @param length1 How much to compare from the left buffer + * @param length2 How much to compare from the right buffer + * @return 0 if equal, < 0 if left is less than right, etc. + */ + public static int compareTo(byte[] buffer1, int offset1, int length1, + byte[] buffer2, int offset2, int length2) { + return compareByteArrayInLexOrder(buffer1, offset1, length1, buffer2, offset2, length2); + } + + public static int compareByteArrayInLexOrder(byte[] buffer1, int offset1, int length1, + byte[] buffer2, int offset2, int length2) { + // Short circuit equal case + if (buffer1 == buffer2 && + offset1 == offset2 && + length1 == length2) { + return 0; + } + // Bring WritableComparator code local + int end1 = offset1 + length1; + int end2 = offset2 + length2; + for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) { + int a = (buffer1[i] & 0xff); + int b = (buffer2[j] & 0xff); + if (a != b) { + return a - b; + } + } + return length1 - length2; + } + + /** + * @param left left operand + * @param right right operand + * @return True if equal + */ + public static boolean equals(final byte[] left, final byte[] right) { + // Could use Arrays.equals? + //noinspection SimplifiableConditionalExpression + if (left == right) return true; + if (left == null || right == null) return false; + if (left.length != right.length) return false; + if (left.length == 0) return true; + + // Since we're often comparing adjacent sorted data, + // it's usual to have equal arrays except for the very last byte + // so check that first + if (left[left.length - 1] != right[right.length - 1]) return false; + + return compareTo(left, right) == 0; + } + + public static boolean equals(final byte[] left, int leftOffset, int leftLen, + final byte[] right, int rightOffset, int rightLen) { + // short circuit case + if (left == right && + leftOffset == rightOffset && + leftLen == rightLen) { + return true; + } + // different lengths fast check + if (leftLen != rightLen) { + return false; + } + if (leftLen == 0) { + return true; + } + + // Since we're often comparing adjacent sorted data, + // it's usual to have equal arrays except for the very last byte + // so check that first + if (left[leftOffset + leftLen - 1] != right[rightOffset + rightLen - 1]) return false; + + return compareByteArrayInLexOrder(left, leftOffset, leftLen, right, rightOffset, rightLen) == 0; + } + + + /** + * Return true if the byte array on the right is a prefix of the byte + * array on the left. + */ + public static boolean startsWith(byte[] bytes, byte[] prefix) { + return bytes != null && prefix != null && + bytes.length >= prefix.length && + compareByteArrayInLexOrder(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0; + } + + public static int hashCode(final byte[] b) { + return hashCode(b, b.length); + } + + public static int hashCode(final byte[] b, final int length) { + return hashBytes(b, length); + } + + /** + * Compute hash for binary data. + */ + public static int hashBytes(byte[] bytes, int offset, int length) { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int) bytes[i]; + return hash; + } + + /** + * Compute hash for binary data. + */ + public static int hashBytes(byte[] bytes, int length) { + return hashBytes(bytes, 0, length); + } + + /** + * @param bytes array to hash + * @param offset offset to start from + * @param length length to hash + */ + public static int hashCode(byte[] bytes, int offset, int length) { + int hash = 1; + for (int i = offset; i < offset + length; i++) + hash = (31 * hash) + (int) bytes[i]; + return hash; + } + + /** + * http://tools.ietf.org/html/rfc3629 + */ + public static int stringtoUTF8Bytes(String str, byte[] buffer) { + int index = 0; + for (int i = 0; i < str.length(); i++) { + char strChar = str.charAt(i); + if ((strChar & 0xFF80) == 0) { + // (00000000 00000000 - 00000000 01111111) -> 0xxxxxxx + buffer[index++] = (byte) (strChar & 0x00FF); + } else if ((strChar & 0xF800) == 0) { + // (00000000 10000000 - 00000111 11111111) -> 110xxxxx 10xxxxxx + buffer[index++] = (byte) ((strChar >> 6) | 0x00c0); + buffer[index++] = (byte) ((strChar & 0x003F) | 0x0080); + } else { + // (00001000 00000000 - 11111111 11111111) -> 1110xxxx 10xxxxxx 10xxxxxx + buffer[index++] = (byte) ((strChar >> 12) | 0x00e0); + buffer[index++] = (byte) (((strChar >> 6) & 0x003F) | 0x0080); + buffer[index++] = (byte) ((strChar & 0x003F) | 0x0080); + } + } + return index; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java new file mode 100644 index 0000000..44fd4bb --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java @@ -0,0 +1,15 @@ +package com.alibaba.jstorm.metric; + +import java.util.UUID; + +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class DefaultMetricIDGenerator implements MetricIDGenerator { + + @Override + public long genMetricId(String metricName) { + return UUID.randomUUID().getLeastSignificantBits(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java new file mode 100644 index 0000000..5de2b8d --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java @@ -0,0 +1,84 @@ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.common.metric.MetricMeta; +import com.alibaba.jstorm.common.metric.TaskTrack; +import com.alibaba.jstorm.common.metric.TopologyHistory; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; + +/** + * a dummy metric query client implementation + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +public class DefaultMetricQueryClient implements MetricQueryClient { + @Override + public void init(Map conf) { + } + + @Override + public List<MetricMeta> getMetricMeta(String clusterName, String topologyId, MetaType type, MetaFilter filter, Object arg) { + return Lists.newArrayList(); + } + + @Override + public List<MetricMeta> getMetricMeta(String clusterName, String topologyId, MetaType type) { + return Lists.newArrayList(); + } + + @Override + public List<MetricMeta> getWorkerMeta(String clusterName, String topologyId) { + return Lists.newArrayList(); + } + + @Override + public List<MetricMeta> getNettyMeta(String clusterName, String topologyId) { + return Lists.newArrayList(); + } + + @Override + public List<MetricMeta> getTaskMeta(String clusterName, String topologyId, int taskId) { + return Lists.newArrayList(); + } + + @Override + public List<MetricMeta> getComponentMeta(String clusterName, String topologyId, String componentId) { + return Lists.newArrayList(); + } + + @Override + public MetricMeta getMetricMeta(String clusterName, String topologyId, MetaType metaType, long metricId) { + return null; + } + + @Override + public List<Object> getMetricData(long metricId, MetricType metricType, int win, long start, long end) { + return Lists.newArrayList(); + } + + @Override + public List<TaskTrack> getTaskTrack(String clusterName, String topologyId) { + return Lists.newArrayList(); + } + + @Override + public List<TaskTrack> getTaskTrack(String clusterName, String topologyId, int taskId) { + return Lists.newArrayList(); + } + + @Override + public List<TopologyHistory> getTopologyHistory(String clusterName, String topologyName, int size) { + return Lists.newArrayList(); + } + + @Override + public void deleteMeta(MetricMeta meta) { + } + + @Override + public void deleteMeta(List<MetricMeta> metaList) { + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java index 631c38b..85e7f15 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java @@ -27,19 +27,14 @@ import com.codahale.metrics.health.HealthCheck; import com.codahale.metrics.health.HealthCheckRegistry; public class JStormHealthCheck { - private static final Logger LOG = LoggerFactory - .getLogger(JStormHealthCheck.class); + private static final Logger LOG = LoggerFactory.getLogger(JStormHealthCheck.class); - private final static Map<Integer, HealthCheckRegistry> taskHealthCheckMap = - new ConcurrentHashMap<Integer, HealthCheckRegistry>(); + private final static Map<Integer, HealthCheckRegistry> taskHealthCheckMap = new ConcurrentHashMap<Integer, HealthCheckRegistry>(); - private final static HealthCheckRegistry workerHealthCheck = - new HealthCheckRegistry(); + private final static HealthCheckRegistry workerHealthCheck = new HealthCheckRegistry(); - public static void registerTaskHealthCheck(int taskId, String name, - HealthCheck healthCheck) { - HealthCheckRegistry healthCheckRegister = - taskHealthCheckMap.get(taskId); + public static void registerTaskHealthCheck(int taskId, String name, HealthCheck healthCheck) { + HealthCheckRegistry healthCheckRegister = taskHealthCheckMap.get(taskId); if (healthCheckRegister == null) { healthCheckRegister = new HealthCheckRegistry(); @@ -49,14 +44,12 @@ public class JStormHealthCheck { healthCheckRegister.register(name, healthCheck); } - public static void registerWorkerHealthCheck(String name, - HealthCheck healthCheck) { + public static void registerWorkerHealthCheck(String name, HealthCheck healthCheck) { workerHealthCheck.register(name, healthCheck); } public static void unregisterTaskHealthCheck(int taskId, String name) { - HealthCheckRegistry healthCheckRegister = - taskHealthCheckMap.get(taskId); + HealthCheckRegistry healthCheckRegister = taskHealthCheckMap.get(taskId); if (healthCheckRegister != null) { healthCheckRegister.unregister(name); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java new file mode 100644 index 0000000..e344bfd --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java @@ -0,0 +1,59 @@ +package com.alibaba.jstorm.metric; + +import com.alibaba.jstorm.callback.RunnableCallback; +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.daemon.worker.WorkerData; +import com.codahale.metrics.health.HealthCheckRegistry; +import com.codahale.metrics.health.HealthCheck.Result; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Created by wuchong on 15/9/17. + */ +public class JStormHealthReporter extends RunnableCallback { + private static final Logger LOG = LoggerFactory.getLogger(JStormHealthReporter.class); + private static final int THREAD_CYCLE = 60; //report every minute + private WorkerData workerData; + + public JStormHealthReporter(WorkerData workerData) { + this.workerData = workerData; + } + + @Override + public void run() { + StormClusterState clusterState = workerData.getZkCluster(); + String topologyId = workerData.getTopologyId(); + + Map<Integer, HealthCheckRegistry> taskHealthCheckMap = JStormHealthCheck.getTaskhealthcheckmap(); + int cnt = 0; + for (Map.Entry<Integer, HealthCheckRegistry> entry : taskHealthCheckMap.entrySet()) { + Integer taskId = entry.getKey(); + Map<String, Result> results = entry.getValue().runHealthChecks(); + + for (Map.Entry<String, Result> result : results.entrySet()) { + if (!result.getValue().isHealthy()) { + try { + clusterState.report_task_error(topologyId, taskId, result.getValue().getMessage(), null); + cnt++; + } catch (Exception e) { + LOG.error("Failed to update health data in ZK for topo-{} task-{}.", topologyId, taskId, e); + } + } + } + } + LOG.info("Successfully updated {} health data to ZK for topology:{}", cnt, topologyId); + } + + @Override + public Object getResult() { + return THREAD_CYCLE; + } + + @Override + public String getThreadName() { + return "HealthReporterThread"; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java new file mode 100644 index 0000000..3a85b73 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java @@ -0,0 +1,351 @@ +package com.alibaba.jstorm.metric; + +import backtype.storm.generated.MetricInfo; +import backtype.storm.generated.TopologyMetric; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.cache.JStormCache; +import com.alibaba.jstorm.cache.RocksDBCache; +import com.alibaba.jstorm.cache.TimeoutMemCache; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.cluster.StormConfig; +import com.alibaba.jstorm.utils.OSInfo; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * metrics cache. we maintain the following data in rocks DB cache: 1. all topology ids 2. topology id ==> all metrics meta(map<metric_name, metric_id>) 3. + * topology id ==> all metrics data + * + * @author Cody ([email protected]) + * @since 2.0.5 + */ +@SuppressWarnings("unchecked") +public class JStormMetricCache { + + private static final Logger LOG = LoggerFactory.getLogger(JStormMetricCache.class); + + public static final String TIMEOUT_MEM_CACHE_CLASS = TimeoutMemCache.class.getName(); + public static final String ROCKS_DB_CACHE_CLASS = RocksDBCache.class.getName(); + + protected final Object lock = new Object(); + + protected JStormCache cache = null; + + protected static final String METRIC_META_PREFIX = "__metric.meta__"; + protected static final String SENT_METRIC_META_PREFIX = "__saved.metric.meta__"; + protected static final String ALL_TOPOLOGIES_KEY = "__all.topologies__"; + protected static final String TOPOLOGY_SAMPLE_RATE = "__topology.sample.rate__"; + + protected static final String METRIC_DATA_PREFIX = "__metric.data__"; + protected static final String METRIC_DATA_30M_COMPONENT = "__metric.data.comp__"; + protected static final String METRIC_DATA_30M_TASK = "__metric.data.task__"; + protected static final String METRIC_DATA_30M_STREAM = "__metric.data.stream__"; + protected static final String METRIC_DATA_30M_WORKER = "__metric.data.worker__"; + protected static final String METRIC_DATA_30M_NETTY = "__metric.data.netty__"; + protected static final String METRIC_DATA_30M_TOPOLOGY = "__metric.data.topology__"; + + protected final StormClusterState zkCluster; + + public String getNimbusCacheClass(Map conf) { + boolean isLinux = OSInfo.isLinux(); + boolean isMac = OSInfo.isMac(); + boolean isLocal = StormConfig.local_mode(conf); + + if (isLocal) { + return TIMEOUT_MEM_CACHE_CLASS; + } + + if (!isLinux && !isMac) { + return TIMEOUT_MEM_CACHE_CLASS; + } + + String nimbusCacheClass = ConfigExtension.getNimbusCacheClass(conf); + if (!StringUtils.isBlank(nimbusCacheClass)) { + return nimbusCacheClass; + } + + return ROCKS_DB_CACHE_CLASS; + } + + public JStormMetricCache(Map conf, StormClusterState zkCluster) { + String dbCacheClass = getNimbusCacheClass(conf); + LOG.info("JStorm metrics cache will use {}", dbCacheClass); + + boolean reset = ConfigExtension.getMetricCacheReset(conf); + try { + cache = (JStormCache) Utils.newInstance(dbCacheClass); + + String dbDir = StormConfig.metricDbDir(conf); + conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir); + conf.put(RocksDBCache.ROCKSDB_RESET, reset); + cache.init(conf); + } catch (Exception e) { + if (!reset && cache != null) { + LOG.error("Failed to init rocks db, will reset and try to re-init..."); + conf.put(RocksDBCache.ROCKSDB_RESET, true); + try { + cache.init(conf); + } catch (Exception ex) { + LOG.error("Error", ex); + } + } else { + LOG.error("Failed to create metrics cache!", e); + throw new RuntimeException(e); + } + } + + this.zkCluster = zkCluster; + } + + public JStormCache getCache() { + return cache; + } + + public JStormCache put(String k, Object v) { + cache.put(k, v); + return cache; + } + + /** + * store 30min metric data. the metric data is stored in a ring. + */ + public JStormCache putMetricData(String topologyId, TopologyMetric tpMetric) { + // map<key, [ts, metric_info]> + Map<String, Object> batchData = new HashMap<String, Object>(); + long ts = System.currentTimeMillis(); + int tp = 0, comp = 0, task = 0, stream = 0, worker = 0, netty = 0; + if (tpMetric.get_componentMetric().get_metrics_size() > 0) { + batchData.put(METRIC_DATA_30M_COMPONENT + topologyId, new Object[]{ts, tpMetric.get_componentMetric()}); + comp += tpMetric.get_componentMetric().get_metrics_size(); + } + if (tpMetric.get_taskMetric().get_metrics_size() > 0) { + tryCombineMetricInfo(METRIC_DATA_30M_TASK + topologyId, tpMetric.get_taskMetric(), MetaType.TASK, ts); + task += tpMetric.get_taskMetric().get_metrics_size(); + } + if (tpMetric.get_streamMetric().get_metrics_size() > 0) { + tryCombineMetricInfo(METRIC_DATA_30M_STREAM + topologyId, tpMetric.get_streamMetric(), MetaType.STREAM, ts); + stream += tpMetric.get_streamMetric().get_metrics_size(); + } + if (tpMetric.get_workerMetric().get_metrics_size() > 0) { + tryCombineMetricInfo(METRIC_DATA_30M_WORKER + topologyId, tpMetric.get_workerMetric(), MetaType.WORKER, ts); + worker += tpMetric.get_workerMetric().get_metrics_size(); + } + if (tpMetric.get_nettyMetric().get_metrics_size() > 0) { + tryCombineMetricInfo(METRIC_DATA_30M_NETTY + topologyId, tpMetric.get_nettyMetric(), MetaType.NETTY, ts); + netty += tpMetric.get_nettyMetric().get_metrics_size(); + } + + // store 30 snapshots of topology metrics + if (tpMetric.get_topologyMetric().get_metrics_size() > 0) { + String keyPrefix = METRIC_DATA_30M_TOPOLOGY + topologyId + "-"; + int page = getRingAvailableIndex(keyPrefix); + + batchData.put(keyPrefix + page, new Object[]{ts, tpMetric.get_topologyMetric()}); + tp += tpMetric.get_topologyMetric().get_metrics_size(); + } + LOG.info("caching metric data for topology:{},tp:{},comp:{},task:{},stream:{},worker:{},netty:{},cost:{}", + topologyId, tp, comp, task, stream, worker, netty, System.currentTimeMillis() - ts); + + return putBatch(batchData); + } + + private int getRingAvailableIndex(String keyPrefix) { + int page = 0; + // backward check + long last_ts = 0; + for (int idx = 1; idx <= 30; idx++) { + String key = keyPrefix + idx; + if (cache.get(key) != null) { + long timestamp = (long) ((Object[]) cache.get(key))[0]; + if (timestamp > last_ts) { + last_ts = timestamp; + page = idx; + } + } + } + if (page < 30) { + page += 1; + } else { + page = 1; + } + return page; + } + + private void tryCombineMetricInfo(String key, MetricInfo incoming, MetaType metaType, long ts) { + Object data = cache.get(key); + if (data != null) { + try { + Object[] parts = (Object[]) data; + MetricInfo old = (MetricInfo) parts[1]; + + LOG.info("combine {} metrics, old:{}, new:{}", + metaType, old.get_metrics_size(), incoming.get_metrics_size()); + old.get_metrics().putAll(incoming.get_metrics()); + // remove dead worker + cache.put(key, new Object[]{ts, old}); + } catch (Exception ignored) { + cache.remove(key); + cache.put(key, new Object[]{ts, incoming}); + } + } else { + cache.put(key, new Object[]{ts, incoming}); + } + } + + public List<MetricInfo> getMetricData(String topologyId, MetaType metaType) { + Map<Long, MetricInfo> retMap = new TreeMap<Long, MetricInfo>(); + + String key = null; + if (metaType == MetaType.COMPONENT) { + key = METRIC_DATA_30M_COMPONENT + topologyId; + } else if (metaType == MetaType.TASK) { + key = METRIC_DATA_30M_TASK + topologyId; + } else if (metaType == MetaType.STREAM) { + key = METRIC_DATA_30M_STREAM + topologyId; + } else if (metaType == MetaType.WORKER) { + key = METRIC_DATA_30M_WORKER + topologyId; + } else if (metaType == MetaType.NETTY) { + key = METRIC_DATA_30M_NETTY + topologyId; + } else if (metaType == MetaType.TOPOLOGY) { + String keyPrefix = METRIC_DATA_30M_TOPOLOGY + topologyId + "-"; + for (int i = 1; i <= 30; i++) { + Object obj = cache.get(keyPrefix + i); + if (obj != null) { + Object[] objects = (Object[]) obj; + retMap.put((Long) objects[0], (MetricInfo) objects[1]); + } + } + } + if (key != null) { + Object obj = cache.get(key); + if (obj != null) { + Object[] objects = (Object[]) obj; + retMap.put((Long) objects[0], (MetricInfo) objects[1]); + } + } + List<MetricInfo> ret = Lists.newArrayList(retMap.values()); + int cnt = 0; + for (MetricInfo metricInfo : ret) { + cnt += metricInfo.get_metrics_size(); + } + LOG.info("getMetricData, topology:{}, meta type:{}, metric info size:{}, total metric size:{}", + topologyId, metaType, ret.size(), cnt); + return ret; + } + + public JStormCache putBatch(Map<String, Object> kv) { + if (kv.size() > 0) { + cache.putBatch(kv); + } + return cache; + } + + public Object get(String k) { + return cache.get(k); + } + + public void remove(String k) { + cache.remove(k); + } + + public void removeTopology(String topologyId) { + removeTopologyMeta(topologyId); + removeTopologyData(topologyId); + } + + protected void removeTopologyMeta(String topologyId) { + cache.remove(METRIC_META_PREFIX + topologyId); + } + + protected void removeTopologyData(String topologyId) { + long start = System.currentTimeMillis(); + cache.remove(METRIC_DATA_PREFIX + topologyId); + + Set<String> metricDataKeys = new HashSet<>(); + for (int i = 1; i <= 30; i++) { + String metricDataKeySuffix = topologyId + "-" + i; + metricDataKeys.add(METRIC_DATA_30M_TOPOLOGY + metricDataKeySuffix); + } + metricDataKeys.add(METRIC_DATA_30M_COMPONENT + topologyId); + metricDataKeys.add(METRIC_DATA_30M_TASK + topologyId); + metricDataKeys.add(METRIC_DATA_30M_STREAM + topologyId); + metricDataKeys.add(METRIC_DATA_30M_WORKER + topologyId); + metricDataKeys.add(METRIC_DATA_30M_NETTY + topologyId); + + cache.removeBatch(metricDataKeys); + LOG.info("removing metric cache of topology:{}, cost:{}", topologyId, System.currentTimeMillis() - start); + } + + public void unregisterWorker(String topologyId, String host, int port) { + String prefix = MetricUtils.workerMetricPrefix(topologyId, host, port); + synchronized (lock) { + //remove dead worker meta info in METRIC_META_PREFIX + Map<String, Long> nodes = (Map<String, Long>) cache.get(METRIC_META_PREFIX + topologyId); + if (nodes != null) { + Iterator<String> keyIterator = nodes.keySet().iterator(); + while (keyIterator.hasNext()){ + String metricName = keyIterator.next(); + // remove metric type + metricName = metricName.charAt(0) + metricName.substring(2, metricName.length()); + if (metricName.startsWith(prefix)) { + keyIterator.remove(); + } + } + cache.put(METRIC_META_PREFIX + topologyId, nodes); + } + //remove dead worker in METRIC_DATA_30M_WORKER + Object data = cache.get(METRIC_DATA_30M_WORKER + topologyId); + if (data != null) { + Object[] parts = (Object[]) data; + MetricInfo old = (MetricInfo) parts[1]; + Iterator<String> oldKeys = old.get_metrics().keySet().iterator(); + while (oldKeys.hasNext()) { + String metricName = oldKeys.next(); + metricName = metricName.charAt(0) + metricName.substring(2, metricName.length()); + if (metricName.startsWith(prefix)) { + oldKeys.remove(); + LOG.info("remove dead worker metric : {}", metricName); + } + } + cache.put(METRIC_DATA_30M_WORKER + topologyId, data); + } + } + } + + public Map<String, Long> getMeta(String topologyId) { + return (Map<String, Long>) cache.get(METRIC_META_PREFIX + topologyId); + } + + public void putMeta(String topologyId, Object v) { + cache.put(METRIC_META_PREFIX + topologyId, v); + } + + public void putSampleRate(String topologyId, double sampleRate) { + cache.put(TOPOLOGY_SAMPLE_RATE + topologyId, sampleRate); + } + + public void removeSampleRate(String topologyId) { + cache.remove(TOPOLOGY_SAMPLE_RATE + topologyId); + } + + public double getSampleRate(String topologyId) { + String rate = (String) cache.get(TOPOLOGY_SAMPLE_RATE + topologyId); + if (rate == null) { + return ConfigExtension.DEFAULT_METRIC_SAMPLE_RATE; + } + return Double.parseDouble(rate); + } + + public Map<String, Long> getSentMeta(String topologyId) { + return (Map<String, Long>) cache.get(SENT_METRIC_META_PREFIX + topologyId); + } + + public void putSentMeta(String topologyId, Object allMetricMeta) { + cache.put(SENT_METRIC_META_PREFIX + topologyId, allMetricMeta); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java index 8221cd8..6531c9c 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java @@ -15,267 +15,441 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.jstorm.metric; -import java.io.Serializable; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.lang.management.MemoryUsage; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package com.alibaba.jstorm.metric; +import backtype.storm.generated.MetricInfo; +import com.alibaba.jstorm.common.metric.*; +import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot; +import com.alibaba.jstorm.utils.NetWorkUtils; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.generated.MetricInfo; - -import com.alibaba.jstorm.common.metric.Counter; -import com.alibaba.jstorm.common.metric.Gauge; -import com.alibaba.jstorm.common.metric.Histogram; -import com.alibaba.jstorm.common.metric.Meter; -import com.alibaba.jstorm.common.metric.MetricRegistry; -import com.alibaba.jstorm.common.metric.Timer; -import com.alibaba.jstorm.common.metric.window.Metric; -import com.alibaba.jstorm.utils.JStormUtils; +import java.io.Serializable; +import java.util.*; +/** + * @author Cody ([email protected]) + * @since 2.0.5 + */ public class JStormMetrics implements Serializable { - private static final Logger LOG = LoggerFactory - .getLogger(JStormMetrics.class); - private static final long serialVersionUID = 2046603514943797241L; + private static final long serialVersionUID = -2580242512743243267L; + + public static final String NIMBUS_METRIC_KEY = "__NIMBUS__"; + public static final String CLUSTER_METRIC_KEY = "__CLUSTER__"; + public static final String SUPERVISOR_METRIC_KEY = "__SUPERVISOR__"; + + protected static final Logger LOG = LoggerFactory.getLogger(JStormMetrics.class); /** * Metrics in this object will be uploaded to nimbus */ - static MetricRegistry workerMetrics = new MetricRegistry(); - static Map<Integer, MetricRegistry> taskMetrics = - new ConcurrentHashMap<Integer, MetricRegistry>(); - /** - * Metrics in this object will be just be output to log, won't be uploaded - * to nimbus - */ - static MetricRegistry skipMetrics = new MetricRegistry(); + protected static final AsmMetricRegistry workerMetrics = new AsmMetricRegistry(); + protected static final AsmMetricRegistry nettyMetrics = new AsmMetricRegistry(); + protected static final AsmMetricRegistry componentMetrics = new AsmMetricRegistry(); + protected static final AsmMetricRegistry taskMetrics = new AsmMetricRegistry(); + protected static final AsmMetricRegistry streamMetrics = new AsmMetricRegistry(); + protected static final AsmMetricRegistry topologyMetrics = new AsmMetricRegistry(); - protected static MetricInfo exposeWorkerMetrics; - protected static Map<String, MetricInfo> exposeNettyMetrics; - protected static Map<Integer, MetricInfo> exposeTaskMetrics; + protected static final AsmMetricRegistry[] allRegistries = { + streamMetrics, taskMetrics, componentMetrics, workerMetrics, nettyMetrics, topologyMetrics}; - static { - registerWorkerGauge(new com.codahale.metrics.Gauge<Double>() { + protected static String topologyId; + protected static String host; + protected static int port; + protected static boolean debug; - @Override - public Double getValue() { - // TODO Auto-generated method stub - return JStormUtils.getCpuUsage(); - } + public static final String DEFAULT_GROUP = "sys"; + public static final String NETTY_GROUP = "netty"; - }, MetricDef.CPU_USED_RATIO); + protected static Set<String> debugMetricNames = new HashSet<String>(); - registerWorkerGauge(new com.codahale.metrics.Gauge<Double>() { + static { + host = NetWorkUtils.ip(); + } - @Override - public Double getValue() { - // TODO Auto-generated method stub - return JStormUtils.getMemUsage(); - } + private static boolean enabled = true; - }, MetricDef.MEMORY_USED); + public static int getPort() { + return port; } - public static MetricRegistry registerTask(int taskId) { - MetricRegistry ret = taskMetrics.get(taskId); - if (ret == null) { - ret = new MetricRegistry(); - taskMetrics.put(taskId, ret); - LOG.info("Register task MetricRegistry " + taskId); - } + public static void setPort(int port) { + JStormMetrics.port = port; + } - return ret; + public static String getHost() { + return host; } - public static void unregisterTask(int taskId) { - taskMetrics.remove(taskId); + public static void setHost(String host) { + JStormMetrics.host = host; } - // the Metric should be one of metrics of task - // if register this metric through this function, - // the web UI would do sum operation for the metric - // the metric will display in component/topology level in web UI - public static void registerSumMetric(String name) { - MetricDef.MERGE_SUM_TAG.add(name); + public static String getTopologyId() { + return topologyId; } - public static void unregisterSumMetric(String name) { - MetricDef.MERGE_SUM_TAG.remove(name); + public static void setTopologyId(String topologyId) { + JStormMetrics.topologyId = topologyId; } - // the Metric should be one of metrics of task - // if register this metric through this function, - // the web UI would do sum operation for the metric - // the metric will display in component/topology level in web UI - public static void registerAvgMetric(String name) { - MetricDef.MERGE_AVG_TAG.add(name); + public static boolean isDebug() { + return debug; } - public static void unregisterAvgMetric(String name) { - MetricDef.MERGE_AVG_TAG.remove(name); + public static void setDebug(boolean debug) { + JStormMetrics.debug = debug; + LOG.info("topology metrics debug enabled:{}", debug); } - public static <T extends Metric> T registerWorkerMetric(T metric, - String name, String... args) throws IllegalArgumentException { - String registerName = MetricRegistry.name(name, args); + public static void setEnabled(boolean enabled) { + JStormMetrics.enabled = enabled; + } - return workerMetrics.register(registerName, metric); + public static boolean isEnabled() { + return enabled; } - public static void unregisterWorkerMetric(String name, String... args) { - String registerName = MetricRegistry.name(name, args); + public static String workerMetricName(String name, MetricType type) { + return MetricUtils.workerMetricName(topologyId, host, port, name, type); + } - workerMetrics.remove(registerName); + public static void addDebugMetrics(String names) { + String[] metrics = names.split(","); + for (String metric : metrics) { + metric = metric.trim(); + if (!StringUtils.isBlank(metric)) { + debugMetricNames.add(metric); + } + } + LOG.info("debug metric names:{}", Joiner.on(",").join(debugMetricNames)); } - public static <T extends Metric> T registerTaskMetric(T metric, int taskId, - String name, String... args) throws IllegalArgumentException { - MetricRegistry metrics = taskMetrics.get(taskId); - if (metrics == null) { - throw new IllegalArgumentException("Invalid taskId " + taskId); + /** + * reserve for debug purposes + */ + public static AsmMetric find(String name) { + for (AsmMetricRegistry registry : allRegistries) { + AsmMetric metric = registry.getMetric(name); + if (metric != null) { + return metric; + } } + return null; + } - String registerName = MetricRegistry.name(name, args); + public static AsmMetric registerStreamMetric(String name, AsmMetric metric, boolean mergeTopology) { + name = fixNameIfPossible(name); + LOG.info("register stream metric:{}", name); + AsmMetric ret = streamMetrics.register(name, metric); - return metrics.register(registerName, metric); - } + if (metric.isAggregate()) { + List<AsmMetric> assocMetrics = new ArrayList<>(); + + String taskMetricName = MetricUtils.stream2taskName(name); + AsmMetric taskMetric = taskMetrics.register(taskMetricName, metric.clone()); + assocMetrics.add(taskMetric); + + String compMetricName = MetricUtils.task2compName(taskMetricName); + AsmMetric componentMetric = componentMetrics.register(compMetricName, taskMetric.clone()); + assocMetrics.add(componentMetric); + + String metricName = MetricUtils.getMetricName(name); + if (metricName.contains(".")){ + compMetricName = MetricUtils.task2MergeCompName(taskMetricName); + AsmMetric mergeCompMetric = componentMetrics.register(compMetricName, taskMetric.clone()); + assocMetrics.add(mergeCompMetric); + } + + if (mergeTopology){ + String topologyMetricName = MetricUtils.comp2topologyName(compMetricName); + AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone()); + assocMetrics.add(topologyMetric); + } - public static void unregisterTaskMetric(int taskId, String name, - String... args) throws IllegalArgumentException { - String registerName = MetricRegistry.name(name, args); - MetricRegistry metrics = taskMetrics.get(taskId); - if (metrics == null) { - throw new IllegalArgumentException("Invalid taskId"); + ret.addAssocMetrics(assocMetrics.toArray(new AsmMetric[assocMetrics.size()])); } - metrics.remove(registerName); - } - public static Gauge<Double> registerWorkerGauge( - com.codahale.metrics.Gauge<Double> rawGauge, String name, - String... args) { - Gauge<Double> ret = new Gauge<Double>(rawGauge); - registerWorkerMetric(ret, name, args); return ret; } - public static Gauge<Double> registerTaskGauge( - com.codahale.metrics.Gauge<Double> rawGauge, int taskId, - String name, String... args) { - Gauge<Double> ret = new Gauge<Double>(rawGauge); - registerTaskMetric(ret, taskId, name, args); + public static AsmMetric registerTaskMetric(String name, AsmMetric metric) { + name = fixNameIfPossible(name); + AsmMetric ret = taskMetrics.register(name, metric); + + if (metric.isAggregate()) { + String compMetricName = MetricUtils.task2compName(name); + AsmMetric componentMetric = componentMetrics.register(compMetricName, ret.clone()); + + ret.addAssocMetrics(componentMetric); + } + return ret; } - public static Counter<Double> registerWorkerCounter(String name, - String... args) throws IllegalArgumentException { - Counter<Double> ret = - (Counter<Double>) Builder.mkInstance(Builder.COUNTER); - registerWorkerMetric(ret, name, args); - return ret; +// public static AsmMetric registerStreamTopologyMetric(String name, AsmMetric metric) { +// name = fixNameIfPossible(name); +// LOG.info("register stream metric:{}", name); +// AsmMetric ret = streamMetrics.register(name, metric); +// +// if (metric.isAggregate()) { +// String taskMetricName = MetricUtils.stream2taskName(name); +// AsmMetric taskMetric = taskMetrics.register(taskMetricName, ret.clone()); +// +// String compMetricName = MetricUtils.task2compName(taskMetricName); +// AsmMetric componentMetric = componentMetrics.register(compMetricName, ret.clone()); +// +// String topologyMetricName = MetricUtils.comp2topologyName(compMetricName); +// AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone()); +// +// ret.addAssocMetrics(taskMetric, componentMetric, topologyMetric); +// } +// +// return ret; +// } + + public static AsmMetric registerWorkerMetric(String name, AsmMetric metric) { + name = fixNameIfPossible(name); + return workerMetrics.register(name, metric); } - public static Counter<Double> registerTaskCounter(int taskId, String name, - String... args) { - Counter<Double> ret = - (Counter<Double>) Builder.mkInstance(Builder.COUNTER); - registerTaskMetric(ret, taskId, name, args); + public static AsmMetric registerWorkerTopologyMetric(String name, AsmMetric metric) { + name = fixNameIfPossible(name); + AsmMetric ret = workerMetrics.register(name, metric); + + String topologyMetricName = MetricUtils.worker2topologyName(name); + AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone()); + + ret.addAssocMetrics(topologyMetric); + return ret; } - public static Meter registerWorkerMeter(String name, String... args) - throws IllegalArgumentException { - Meter ret = (Meter) Builder.mkInstance(Builder.METER); - registerWorkerMetric(ret, name, args); - return ret; + public static AsmMetric registerNettyMetric(String name, AsmMetric metric) { + name = fixNameIfPossible(name, NETTY_GROUP); + return nettyMetrics.register(name, metric); } - public static Meter registerTaskMeter(int taskId, String name, - String... args) { - Meter ret = (Meter) Builder.mkInstance(Builder.METER); - registerTaskMetric(ret, taskId, name, args); - return ret; + /** + * simplified helper method to register a worker histogram + * + * @param topologyId topology id + * @param name metric name, NOTE it's not a full-qualified name. + * @param histogram histogram + * @return registered histogram + */ + public static AsmHistogram registerWorkerHistogram(String topologyId, String name, AsmHistogram histogram) { + return (AsmHistogram) registerWorkerMetric( + MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.HISTOGRAM), histogram); } - public static Histogram registerWorkerHistogram(String name, String... args) - throws IllegalArgumentException { - Histogram ret = (Histogram) Builder.mkInstance(Builder.HISTOGRAM); - registerWorkerMetric(ret, name, args); - return ret; + /** + * simplified helper method to register a worker gauge + */ + public static AsmGauge registerWorkerGauge(String topologyId, String name, AsmGauge gauge) { + return (AsmGauge) registerWorkerMetric( + MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.GAUGE), gauge); } - public static Histogram registerTaskHistogram(int taskId, String name, - String... args) { - Histogram ret = (Histogram) Builder.mkInstance(Builder.HISTOGRAM); - registerTaskMetric(ret, taskId, name, args); - return ret; + /** + * simplified helper method to register a worker meter + */ + public static AsmMeter registerWorkerMeter(String topologyId, String name, AsmMeter meter) { + return (AsmMeter) registerWorkerMetric( + MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.METER), meter); } - public static Timer registerWorkerTimer(String name, String... args) - throws IllegalArgumentException { - Timer ret = (Timer) Builder.mkInstance(Builder.TIMER); - registerWorkerMetric(ret, name, args); - return ret; + /** + * simplified helper method to register a worker counter + */ + public static AsmCounter registerWorkerCounter(String topologyId, String name, AsmCounter counter) { + return (AsmCounter) registerWorkerMetric( + MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.COUNTER), counter); } - public static Timer registerTaskTimer(int taskId, String name, - String... args) { - Timer ret = (Timer) Builder.mkInstance(Builder.TIMER); - registerTaskMetric(ret, taskId, name, args); - return ret; + /** + * simplified helper method to register a worker timer + */ + public static AsmTimer registerWorkerTimer(String topologyId, String name, AsmTimer timer) { + return (AsmTimer) registerWorkerMetric( + MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.TIMER), timer); } - public static class Builder { - public static final int COUNTER = 1; - public static final int METER = 2; - public static final int HISTOGRAM = 3; - public static final int TIMER = 4; - - public static Metric mkInstance(int type) { - if (type == COUNTER) { - return new Counter<Double>(Double.valueOf(0)); - } else if (type == METER) { - return new Meter(); - } else if (type == HISTOGRAM) { - return new Histogram(); - } else if (type == TIMER) { - return new Timer(); - } else { - throw new IllegalArgumentException(); - } - } + public static AsmMetric getStreamMetric(String name) { + name = fixNameIfPossible(name); + return streamMetrics.getMetric(name); + } + + public static AsmMetric getTaskMetric(String name) { + name = fixNameIfPossible(name); + return taskMetrics.getMetric(name); + } + + public static AsmMetric getComponentMetric(String name) { + name = fixNameIfPossible(name); + return componentMetrics.getMetric(name); + } + + public static AsmMetric getWorkerMetric(String name) { + name = fixNameIfPossible(name); + return workerMetrics.getMetric(name); } - public static MetricInfo getExposeWorkerMetrics() { - return exposeWorkerMetrics; + public static void unregisterWorkerMetric(String name) { + name = fixNameIfPossible(name); + workerMetrics.remove(name); } - public static void setExposeWorkerMetrics(MetricInfo exposeWorkerMetrics) { - JStormMetrics.exposeWorkerMetrics = exposeWorkerMetrics; + public static void unregisterNettyMetric(String name) { + name = fixNameIfPossible(name, NETTY_GROUP); + nettyMetrics.remove(name); } - public static Map<Integer, MetricInfo> getExposeTaskMetrics() { - return exposeTaskMetrics; + public static void unregisterTaskMetric(String name) { + name = fixNameIfPossible(name); + taskMetrics.remove(name); } - public static void setExposeTaskMetrics( - Map<Integer, MetricInfo> exposeTaskMetrics) { - JStormMetrics.exposeTaskMetrics = exposeTaskMetrics; + public static AsmMetricRegistry getNettyMetrics() { + return nettyMetrics; } - public static Map<String, MetricInfo> getExposeNettyMetrics() { - return exposeNettyMetrics; + public static AsmMetricRegistry getWorkerMetrics() { + return workerMetrics; } - public static void setExposeNettyMetrics(Map<String, MetricInfo> exposeNettyMetrics) { - JStormMetrics.exposeNettyMetrics = exposeNettyMetrics; + public static AsmMetricRegistry getComponentMetrics() { + return componentMetrics; } - + public static AsmMetricRegistry getTaskMetrics() { + return taskMetrics; + } + + public static AsmMetricRegistry getStreamMetrics() { + return streamMetrics; + } + + /** + * convert snapshots to thrift objects, note that timestamps are aligned to min during the conversion, + * so nimbus server will get snapshots with aligned timestamps (still in ms as TDDL will use it). + */ + public static MetricInfo computeAllMetrics() { + long start = System.currentTimeMillis(); + MetricInfo metricInfo = MetricUtils.mkMetricInfo(); + + List<Map.Entry<String, AsmMetric>> entries = Lists.newArrayList(); + entries.addAll(streamMetrics.metrics.entrySet()); + entries.addAll(taskMetrics.metrics.entrySet()); + entries.addAll(componentMetrics.metrics.entrySet()); + entries.addAll(workerMetrics.metrics.entrySet()); + entries.addAll(nettyMetrics.metrics.entrySet()); + entries.addAll(topologyMetrics.metrics.entrySet()); + + for (Map.Entry<String, AsmMetric> entry : entries) { + String name = entry.getKey(); + AsmMetric metric = entry.getValue(); + Map<Integer, AsmSnapshot> snapshots = metric.getSnapshots(); + + int op = metric.getOp(); + if ((op & AsmMetric.MetricOp.LOG) == AsmMetric.MetricOp.LOG) { + MetricUtils.printMetricSnapshot(metric, snapshots); + } + + if ((op & AsmMetric.MetricOp.REPORT) == AsmMetric.MetricOp.REPORT) { + MetaType metaType = MetricUtils.metaType(metric.getMetricName()); + try { + if (metric instanceof AsmCounter) { + Map data = MetricUtils.toThriftCounterSnapshots(snapshots); + putIfNotEmpty(metricInfo.get_metrics(), name, data); + } else if (metric instanceof AsmGauge) { + Map data = MetricUtils.toThriftGaugeSnapshots(snapshots); + putIfNotEmpty(metricInfo.get_metrics(), name, data); + } else if (metric instanceof AsmMeter) { + Map data = MetricUtils.toThriftMeterSnapshots(snapshots); + putIfNotEmpty(metricInfo.get_metrics(), name, data); + } else if (metric instanceof AsmHistogram) { + Map data = MetricUtils.toThriftHistoSnapshots(metaType, snapshots); + putIfNotEmpty(metricInfo.get_metrics(), name, data); + } else if (metric instanceof AsmTimer) { + Map data = MetricUtils.toThriftTimerSnapshots(metaType, snapshots); + putIfNotEmpty(metricInfo.get_metrics(), name, data); + } + } catch (Exception ex) { + LOG.error("Error", ex); + } + } + } + + if (debug) { + MetricUtils.printMetricInfo(metricInfo, debugMetricNames); + } + LOG.info("compute all metrics, cost:{}", System.currentTimeMillis() - start); + + return metricInfo; + } + + @SuppressWarnings("unchecked") + public static <T extends Map> void putIfNotEmpty(Map base, String name, T data) { + if (data != null && data.size() > 0) { + base.put(name, data); + } + } + + public static String fixNameIfPossible(String name) { + return fixNameIfPossible(name, DEFAULT_GROUP); + } + + public static String fixNameIfPossible(String name, String group) { + MetaType type = MetricUtils.metaType(name); + String[] parts = name.split(MetricUtils.DELIM); + if (parts[1].equals("")) { + parts[1] = topologyId; + } + if (type != MetaType.WORKER && parts[5].equals("")) { + parts[5] = group; + } else if (parts[2].equals("")) { + parts[2] = host; + parts[3] = port + ""; + if (parts[4].equals("")) { + parts[4] = group; + } + } + return MetricUtils.concat(parts); + } + + public static void main(String[] args) throws Exception { + JStormMetrics.topologyId = "topologyId"; + JStormMetrics.host = "127.0.0.1"; + JStormMetrics.port = 6800; + + String tpId = "test"; + String compName = "bolt"; + int taskId = 1; + String streamId = "defaultStream"; + String type = MetaType.STREAM.getV() + MetricType.COUNTER.getV(); + String metricName = "counter1"; + String group = "udf"; + + String name = MetricUtils.metricName(type, tpId, compName, taskId, streamId, group, metricName); + System.out.println(name); + + AsmCounter counter = new AsmCounter(); + AsmMetric ret1 = JStormMetrics.registerStreamMetric(name, counter, false); + AsmMetric ret2 = JStormMetrics.registerStreamMetric(name, counter, false); + System.out.println(ret1 == ret2); + + counter.update(1L); + + metricName = MetricUtils.workerMetricName("metric1", MetricType.COUNTER); + System.out.println(metricName); + metricName = fixNameIfPossible(metricName); + System.out.println(metricName); + System.out.println(fixNameIfPossible(metricName)); + } - }
