http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java deleted file mode 100644 index 5be72fe..0000000 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.common.message; - -import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK; -import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH; -import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD; -import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG; -import org.apache.kafka.common.utils.Utils; - -import net.jpountz.lz4.LZ4Exception; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4SafeDecompressor; -import net.jpountz.xxhash.XXHash32; -import net.jpountz.xxhash.XXHashFactory; - -/** - * A partial implementation of the v1.4.1 LZ4 Frame format. - * - * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a> - */ -public final class KafkaLZ4BlockInputStream extends FilterInputStream { - - public static final String PREMATURE_EOS = "Stream ended prematurely"; - public static final String NOT_SUPPORTED = "Stream unsupported"; - public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch"; - public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted"; - - private final LZ4SafeDecompressor decompressor; - private final XXHash32 checksum; - private final byte[] buffer; - private final byte[] compressedBuffer; - private final int maxBlockSize; - private FLG flg; - private BD bd; - private int bufferOffset; - private int bufferSize; - private boolean finished; - - /** - * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm. - * - * @param in The stream to decompress - * @throws IOException - */ - public KafkaLZ4BlockInputStream(InputStream in) throws IOException { - super(in); - decompressor = LZ4Factory.fastestInstance().safeDecompressor(); - checksum = XXHashFactory.fastestInstance().hash32(); - readHeader(); - maxBlockSize = bd.getBlockMaximumSize(); - buffer = new byte[maxBlockSize]; - compressedBuffer = new byte[maxBlockSize]; - bufferOffset = 0; - bufferSize = 0; - finished = false; - } - - /** - * Reads the magic number and frame descriptor from the underlying {@link InputStream}. - * - * @throws IOException - */ - private void readHeader() throws IOException { - byte[] header = new byte[LZ4_MAX_HEADER_LENGTH]; - - // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags - bufferOffset = 6; - if (in.read(header, 0, bufferOffset) != bufferOffset) { - throw new IOException(PREMATURE_EOS); - } - - if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) { - throw new IOException(NOT_SUPPORTED); - } - flg = FLG.fromByte(header[bufferOffset-2]); - bd = BD.fromByte(header[bufferOffset-1]); - // TODO read uncompressed content size, update flg.validate() - // TODO read dictionary id, update flg.validate() - - // check stream descriptor hash - byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF); - header[bufferOffset++] = (byte) in.read(); - if (hash != header[bufferOffset-1]) { - throw new IOException(DESCRIPTOR_HASH_MISMATCH); - } - } - - /** - * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, - * and writes the result to a buffer. - * - * @throws IOException - */ - private void readBlock() throws IOException { - int blockSize = Utils.readUnsignedIntLE(in); - - // Check for EndMark - if (blockSize == 0) { - finished = true; - // TODO implement content checksum, update flg.validate() - return; - } else if (blockSize > maxBlockSize) { - throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize)); - } - - boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0; - byte[] bufferToRead; - if (compressed) { - bufferToRead = compressedBuffer; - } else { - blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK; - bufferToRead = buffer; - bufferSize = blockSize; - } - - if (in.read(bufferToRead, 0, blockSize) != blockSize) { - throw new IOException(PREMATURE_EOS); - } - - // verify checksum - if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) { - throw new IOException(BLOCK_HASH_MISMATCH); - } - - if (compressed) { - try { - bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize); - } catch (LZ4Exception e) { - throw new IOException(e); - } - } - - bufferOffset = 0; - } - - @Override - public int read() throws IOException { - if (finished) { - return -1; - } - if (available() == 0) { - readBlock(); - } - if (finished) { - return -1; - } - int value = buffer[bufferOffset++] & 0xFF; - - return value; - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - net.jpountz.util.Utils.checkRange(b, off, len); - if (finished) { - return -1; - } - if (available() == 0) { - readBlock(); - } - if (finished) { - return -1; - } - len = Math.min(len, available()); - System.arraycopy(buffer, bufferOffset, b, off, len); - bufferOffset += len; - return len; - } - - @Override - public long skip(long n) throws IOException { - if (finished) { - return 0; - } - if (available() == 0) { - readBlock(); - } - if (finished) { - return 0; - } - n = Math.min(n, available()); - bufferOffset += n; - return n; - } - - @Override - public int available() throws IOException { - return bufferSize - bufferOffset; - } - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public synchronized void mark(int readlimit) { - throw new RuntimeException("mark not supported"); - } - - @Override - public synchronized void reset() throws IOException { - throw new RuntimeException("reset not supported"); - } - - @Override - public boolean markSupported() { - return false; - } - -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java deleted file mode 100644 index e5b9e43..0000000 --- a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java +++ /dev/null @@ -1,387 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.common.message; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.kafka.common.utils.Utils; - -import net.jpountz.lz4.LZ4Compressor; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.xxhash.XXHash32; -import net.jpountz.xxhash.XXHashFactory; - -/** - * A partial implementation of the v1.4.1 LZ4 Frame format. - * - * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a> - */ -public final class KafkaLZ4BlockOutputStream extends FilterOutputStream { - - public static final int MAGIC = 0x184D2204; - public static final int LZ4_MAX_HEADER_LENGTH = 19; - public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000; - - public static final String CLOSED_STREAM = "The stream is already closed"; - - public static final int BLOCKSIZE_64KB = 4; - public static final int BLOCKSIZE_256KB = 5; - public static final int BLOCKSIZE_1MB = 6; - public static final int BLOCKSIZE_4MB = 7; - - private final LZ4Compressor compressor; - private final XXHash32 checksum; - private final FLG flg; - private final BD bd; - private final byte[] buffer; - private final byte[] compressedBuffer; - private final int maxBlockSize; - private int bufferOffset; - private boolean finished; - - /** - * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. - * - * @param out The output stream to compress - * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception - * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for every block of data - * @throws IOException - */ - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException { - super(out); - compressor = LZ4Factory.fastestInstance().fastCompressor(); - checksum = XXHashFactory.fastestInstance().hash32(); - bd = new BD(blockSize); - flg = new FLG(blockChecksum); - bufferOffset = 0; - maxBlockSize = bd.getBlockMaximumSize(); - buffer = new byte[maxBlockSize]; - compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)]; - finished = false; - writeHeader(); - } - - /** - * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. - * - * @param out The stream to compress - * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception - * @throws IOException - */ - public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException { - this(out, blockSize, false); - } - - /** - * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm. - * - * @param out The output stream to compress - * @throws IOException - */ - public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException { - this(out, BLOCKSIZE_64KB); - } - - /** - * Writes the magic number and frame descriptor to the underlying {@link OutputStream}. - * - * @throws IOException - */ - private void writeHeader() throws IOException { - Utils.writeUnsignedIntLE(buffer, 0, MAGIC); - bufferOffset = 4; - buffer[bufferOffset++] = flg.toByte(); - buffer[bufferOffset++] = bd.toByte(); - // TODO write uncompressed content size, update flg.validate() - // TODO write dictionary id, update flg.validate() - // compute checksum on all descriptor fields - int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF; - buffer[bufferOffset++] = (byte) hash; - // write out frame descriptor - out.write(buffer, 0, bufferOffset); - bufferOffset = 0; - } - - /** - * Compresses buffered data, optionally computes an XXHash32 checksum, and writes - * the result to the underlying {@link OutputStream}. - * - * @throws IOException - */ - private void writeBlock() throws IOException { - if (bufferOffset == 0) { - return; - } - - int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0); - byte[] bufferToWrite = compressedBuffer; - int compressMethod = 0; - - // Store block uncompressed if compressed length is greater (incompressible) - if (compressedLength >= bufferOffset) { - bufferToWrite = buffer; - compressedLength = bufferOffset; - compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK; - } - - // Write content - Utils.writeUnsignedIntLE(out, compressedLength | compressMethod); - out.write(bufferToWrite, 0, compressedLength); - - // Calculate and write block checksum - if (flg.isBlockChecksumSet()) { - int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0); - Utils.writeUnsignedIntLE(out, hash); - } - bufferOffset = 0; - } - - /** - * Similar to the {@link #writeBlock()} method. Writes a 0-length block - * (without block checksum) to signal the end of the block stream. - * - * @throws IOException - */ - private void writeEndMark() throws IOException { - Utils.writeUnsignedIntLE(out, 0); - // TODO implement content checksum, update flg.validate() - finished = true; - } - - @Override - public void write(int b) throws IOException { - ensureNotFinished(); - if (bufferOffset == maxBlockSize) { - writeBlock(); - } - buffer[bufferOffset++] = (byte) b; - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - net.jpountz.util.Utils.checkRange(b, off, len); - ensureNotFinished(); - - int bufferRemainingLength = maxBlockSize - bufferOffset; - // while b will fill the buffer - while (len > bufferRemainingLength) { - // fill remaining space in buffer - System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength); - bufferOffset = maxBlockSize; - writeBlock(); - // compute new offset and length - off += bufferRemainingLength; - len -= bufferRemainingLength; - bufferRemainingLength = maxBlockSize; - } - - System.arraycopy(b, off, buffer, bufferOffset, len); - bufferOffset += len; - } - - @Override - public void flush() throws IOException { - if (!finished) { - writeBlock(); - } - if (out != null) { - out.flush(); - } - } - - /** - * A simple state check to ensure the stream is still open. - */ - private void ensureNotFinished() { - if (finished) { - throw new IllegalStateException(CLOSED_STREAM); - } - } - - @Override - public void close() throws IOException { - if (!finished) { - writeEndMark(); - flush(); - finished = true; - } - if (out != null) { - out.close(); - out = null; - } - } - - public static class FLG { - - private static final int VERSION = 1; - - private final int presetDictionary; - private final int reserved1; - private final int contentChecksum; - private final int contentSize; - private final int blockChecksum; - private final int blockIndependence; - private final int version; - - public FLG() { - this(false); - } - - public FLG(boolean blockChecksum) { - this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION); - } - - private FLG(int presetDictionary, int reserved1, int contentChecksum, - int contentSize, int blockChecksum, int blockIndependence, int version) { - this.presetDictionary = presetDictionary; - this.reserved1 = reserved1; - this.contentChecksum = contentChecksum; - this.contentSize = contentSize; - this.blockChecksum = blockChecksum; - this.blockIndependence = blockIndependence; - this.version = version; - validate(); - } - - public static FLG fromByte(byte flg) { - int presetDictionary = (flg >>> 0) & 1; - int reserved1 = (flg >>> 1) & 1; - int contentChecksum = (flg >>> 2) & 1; - int contentSize = (flg >>> 3) & 1; - int blockChecksum = (flg >>> 4) & 1; - int blockIndependence = (flg >>> 5) & 1; - int version = (flg >>> 6) & 3; - - return new FLG(presetDictionary, reserved1, contentChecksum, - contentSize, blockChecksum, blockIndependence, version); - } - - public byte toByte() { - return (byte) ( - ((presetDictionary & 1) << 0) - | ((reserved1 & 1) << 1) - | ((contentChecksum & 1) << 2) - | ((contentSize & 1) << 3) - | ((blockChecksum & 1) << 4) - | ((blockIndependence & 1) << 5) - | ((version & 3) << 6) ); - } - - private void validate() { - if (presetDictionary != 0) { - throw new RuntimeException("Preset dictionary is unsupported"); - } - if (reserved1 != 0) { - throw new RuntimeException("Reserved1 field must be 0"); - } - if (contentChecksum != 0) { - throw new RuntimeException("Content checksum is unsupported"); - } - if (contentSize != 0) { - throw new RuntimeException("Content size is unsupported"); - } - if (blockIndependence != 1) { - throw new RuntimeException("Dependent block stream is unsupported"); - } - if (version != VERSION) { - throw new RuntimeException(String.format("Version %d is unsupported", version)); - } - } - - public boolean isPresetDictionarySet() { - return presetDictionary == 1; - } - - public boolean isContentChecksumSet() { - return contentChecksum == 1; - } - - public boolean isContentSizeSet() { - return contentSize == 1; - } - - public boolean isBlockChecksumSet() { - return blockChecksum == 1; - } - - public boolean isBlockIndependenceSet() { - return blockIndependence == 1; - } - - public int getVersion() { - return version; - } - } - - public static class BD { - - private final int reserved2; - private final int blockSizeValue; - private final int reserved3; - - public BD() { - this(0, BLOCKSIZE_64KB, 0); - } - - public BD(int blockSizeValue) { - this(0, blockSizeValue, 0); - } - - private BD(int reserved2, int blockSizeValue, int reserved3) { - this.reserved2 = reserved2; - this.blockSizeValue = blockSizeValue; - this.reserved3 = reserved3; - validate(); - } - - public static BD fromByte(byte bd) { - int reserved2 = (bd >>> 0) & 15; - int blockMaximumSize = (bd >>> 4) & 7; - int reserved3 = (bd >>> 7) & 1; - - return new BD(reserved2, blockMaximumSize, reserved3); - } - - private void validate() { - if (reserved2 != 0) { - throw new RuntimeException("Reserved2 field must be 0"); - } - if (blockSizeValue < 4 || blockSizeValue > 7) { - throw new RuntimeException("Block size value must be between 4 and 7"); - } - if (reserved3 != 0) { - throw new RuntimeException("Reserved3 field must be 0"); - } - } - - // 2^(2n+8) - public int getBlockMaximumSize() { - return (1 << ((2 * blockSizeValue) + 8)); - } - - public byte toByte() { - return (byte) ( - ((reserved2 & 15) << 0) - | ((blockSizeValue & 7) << 4) - | ((reserved3 & 1) << 7) ); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 9c20538..6b9590c 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; public class JmxReporter implements MetricsReporter { private static final Logger log = LoggerFactory.getLogger(JmxReporter.class); - private static final Object lock = new Object(); + private static final Object LOCK = new Object(); private String prefix; private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>(); @@ -58,12 +58,11 @@ public class JmxReporter implements MetricsReporter { } @Override - public void configure(Map<String, ?> configs) { - } + public void configure(Map<String, ?> configs) {} @Override public void init(List<KafkaMetric> metrics) { - synchronized (lock) { + synchronized (LOCK) { for (KafkaMetric metric : metrics) addAttribute(metric); for (KafkaMbean mbean : mbeans.values()) @@ -73,7 +72,7 @@ public class JmxReporter implements MetricsReporter { @Override public void metricChange(KafkaMetric metric) { - synchronized (lock) { + synchronized (LOCK) { KafkaMbean mbean = addAttribute(metric); reregister(mbean); } @@ -86,36 +85,35 @@ public class JmxReporter implements MetricsReporter { if (!this.mbeans.containsKey(mBeanName)) mbeans.put(mBeanName, new KafkaMbean(mBeanName)); KafkaMbean mbean = this.mbeans.get(mBeanName); - mbean.setAttribute(metricName.name() , metric); + mbean.setAttribute(metricName.name(), metric); return mbean; } catch (JMException e) { throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e); } } - /** - * @param metricName - * @return standard JMX MBean name in the following format - * domainName:type=metricType,key1=val1,key2=val2 - */ - private String getMBeanName(MetricName metricName) { - StringBuilder mBeanName = new StringBuilder(); - mBeanName.append(prefix); - mBeanName.append(":type="); - mBeanName.append(metricName.group()); - for (Map.Entry<String, String> entry : metricName.tags().entrySet()) { - if(entry.getKey().length() <= 0 || entry.getValue().length() <= 0) - continue; - mBeanName.append(","); - mBeanName.append(entry.getKey()); - mBeanName.append("="); - mBeanName.append(entry.getValue()); + /** + * @param metricName + * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2 + */ + private String getMBeanName(MetricName metricName) { + StringBuilder mBeanName = new StringBuilder(); + mBeanName.append(prefix); + mBeanName.append(":type="); + mBeanName.append(metricName.group()); + for (Map.Entry<String, String> entry : metricName.tags().entrySet()) { + if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0) + continue; + mBeanName.append(","); + mBeanName.append(entry.getKey()); + mBeanName.append("="); + mBeanName.append(entry.getValue()); + } + return mBeanName.toString(); } - return mBeanName.toString(); - } public void close() { - synchronized (lock) { + synchronized (LOCK) { for (KafkaMbean mbean : this.mbeans.values()) unregister(mbean); } @@ -185,7 +183,12 @@ public class JmxReporter implements MetricsReporter { for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) { String attribute = entry.getKey(); KafkaMetric metric = entry.getValue(); - attrs[i] = new MBeanAttributeInfo(attribute, double.class.getName(), metric.metricName().description(), true, false, false); + attrs[i] = new MBeanAttributeInfo(attribute, + double.class.getName(), + metric.metricName().description(), + true, + false, + false); i += 1; } return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index e53cfaa..ca823fd 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -147,7 +147,7 @@ public final class Sensor { * @param stat The statistic to keep */ public void add(MetricName metricName, MeasurableStat stat) { - add(metricName, stat, null); + add(metricName, stat, null); } /** @@ -157,11 +157,11 @@ public final class Sensor { * @param config A special configuration for this metric. If null use the sensor default configuration. */ public synchronized void add(MetricName metricName, MeasurableStat stat, MetricConfig config) { - KafkaMetric metric = new KafkaMetric(new Object(), - Utils.notNull(metricName), - Utils.notNull(stat), - config == null ? this.config : config, - time); + KafkaMetric metric = new KafkaMetric(new Object(), + Utils.notNull(metricName), + Utils.notNull(stat), + config == null ? this.config : config, + time); this.registry.registerMetric(metric); this.metrics.add(metric); this.stats.add(stat); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index a5838b3..98429da 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -71,7 +71,7 @@ public class Rate implements MeasurableStat { case MILLISECONDS: return time; case SECONDS: - return time / (1000.0); + return time / 1000.0; case MINUTES: return time / (60.0 * 1000.0); case HOURS: http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index dcc639a..fc0d168 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -50,7 +50,7 @@ public class NetworkReceive implements Receive { @Override public ByteBuffer[] reify() { - return new ByteBuffer[] { this.buffer }; + return new ByteBuffer[] {this.buffer}; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index e18a769..6baad93 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -14,7 +14,6 @@ package org.apache.kafka.common.network; import java.io.EOFException; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.CancelledKeyException; @@ -275,7 +274,7 @@ public class Selector implements Selectable { } } catch (IOException e) { String desc = socketDescription(channel); - if(e instanceof EOFException) + if (e instanceof EOFException) log.info("Connection {} disconnected", desc); else log.warn("Error in I/O with connection to {}", desc, e); @@ -290,9 +289,9 @@ public class Selector implements Selectable { private String socketDescription(SocketChannel channel) { Socket socket = channel.socket(); - if(socket == null) + if (socket == null) return "[unconnected socket]"; - else if(socket.getInetAddress() != null) + else if (socket.getInetAddress() != null) return socket.getInetAddress().toString(); else return socket.getLocalAddress().toString(); @@ -525,7 +524,7 @@ public class Selector implements Selectable { String metricGrpName = metricGrpPrefix + "-node-metrics"; Map<String, String> tags = new LinkedHashMap<String, String>(metricTags); - tags.put("node-id", "node-"+node); + tags.put("node-id", "node-" + node); nodeRequest = this.metrics.sensor(nodeRequestName); MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags); http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 109fc96..07aba71 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,10 +16,6 @@ */ package org.apache.kafka.common.protocol; - -import java.util.ArrayList; -import java.util.List; - /** * Identifiers for all the Kafka APIs */ @@ -37,16 +33,18 @@ public enum ApiKeys { HEARTBEAT(12, "heartbeat"); private static ApiKeys[] codeToType; - public static int MAX_API_KEY = -1; + public static final int MAX_API_KEY; static { + int maxKey = -1; for (ApiKeys key : ApiKeys.values()) { - MAX_API_KEY = Math.max(MAX_API_KEY, key.id); + maxKey = Math.max(maxKey, key.id); } - codeToType = new ApiKeys[MAX_API_KEY+1]; + codeToType = new ApiKeys[maxKey + 1]; for (ApiKeys key : ApiKeys.values()) { codeToType[key.id] = key; } + MAX_API_KEY = maxKey; } /** the perminant and immutable id of an API--this can't change ever */