http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
deleted file mode 100644
index 5be72fe..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockInputStream.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.message;
-
-import static 
org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static 
org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
-import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.Utils;
-
-import net.jpountz.lz4.LZ4Exception;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4SafeDecompressor;
-import net.jpountz.xxhash.XXHash32;
-import net.jpountz.xxhash.XXHashFactory;
-
-/**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
- * 
- * @see <a 
href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit";>LZ4
 Framing Format Spec</a>
- */
-public final class KafkaLZ4BlockInputStream extends FilterInputStream {
-
-  public static final String PREMATURE_EOS = "Stream ended prematurely";
-  public static final String NOT_SUPPORTED = "Stream unsupported";
-  public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
-  public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame 
descriptor corrupted";
-  
-  private final LZ4SafeDecompressor decompressor;
-  private final XXHash32 checksum;
-  private final byte[] buffer;
-  private final byte[] compressedBuffer;
-  private final int maxBlockSize;
-  private FLG flg;
-  private BD bd;
-  private int bufferOffset;
-  private int bufferSize;
-  private boolean finished;
-  
-  /**
-   * Create a new {@link InputStream} that will decompress data using the LZ4 
algorithm.
-   * 
-   * @param in The stream to decompress
-   * @throws IOException
-   */
-  public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
-    super(in);
-    decompressor = LZ4Factory.fastestInstance().safeDecompressor();
-    checksum = XXHashFactory.fastestInstance().hash32();
-    readHeader();
-    maxBlockSize = bd.getBlockMaximumSize();
-    buffer = new byte[maxBlockSize];
-    compressedBuffer = new byte[maxBlockSize];
-    bufferOffset = 0;
-    bufferSize = 0;
-    finished = false;
-  }
-  
-  /**
-   * Reads the magic number and frame descriptor from the underlying {@link 
InputStream}.
-   *  
-   * @throws IOException
-   */
-  private void readHeader() throws IOException {
-    byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
-    
-    // read first 6 bytes into buffer to check magic and FLG/BD descriptor 
flags
-    bufferOffset = 6;
-    if (in.read(header, 0, bufferOffset) != bufferOffset) {
-      throw new IOException(PREMATURE_EOS);
-    }
-
-    if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) {
-      throw new IOException(NOT_SUPPORTED);
-    }
-    flg = FLG.fromByte(header[bufferOffset-2]);
-    bd = BD.fromByte(header[bufferOffset-1]);
-    // TODO read uncompressed content size, update flg.validate()
-    // TODO read dictionary id, update flg.validate()
-    
-    // check stream descriptor hash
-    byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 
0xFF);
-    header[bufferOffset++] = (byte) in.read();
-    if (hash != header[bufferOffset-1]) {
-      throw new IOException(DESCRIPTOR_HASH_MISMATCH);
-    }
-  }
-
-  /**
-   * Decompresses (if necessary) buffered data, optionally computes and 
validates a XXHash32 checksum, 
-   * and writes the result to a buffer.
-   * 
-   * @throws IOException
-   */
-  private void readBlock() throws IOException {
-    int blockSize = Utils.readUnsignedIntLE(in);
-
-    // Check for EndMark
-    if (blockSize == 0) {
-      finished = true;
-      // TODO implement content checksum, update flg.validate()
-      return;
-    } else if (blockSize > maxBlockSize) {
-      throw new IOException(String.format("Block size %s exceeded max: %s", 
blockSize, maxBlockSize));
-    }
-    
-    boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
-    byte[] bufferToRead;
-    if (compressed) {
-      bufferToRead = compressedBuffer;
-    } else {
-      blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
-      bufferToRead = buffer;
-      bufferSize = blockSize;
-    }
-    
-    if (in.read(bufferToRead, 0, blockSize) != blockSize) {
-      throw new IOException(PREMATURE_EOS);
-    }
-
-    // verify checksum
-    if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != 
checksum.hash(bufferToRead, 0, blockSize, 0)) {
-      throw new IOException(BLOCK_HASH_MISMATCH);
-    }
-
-    if (compressed) {
-      try {
-        bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, 
buffer, 0, maxBlockSize);
-      } catch (LZ4Exception e) {
-        throw new IOException(e);
-      }
-    }
-
-    bufferOffset = 0;
-  }
- 
-  @Override
-  public int read() throws IOException {
-    if (finished) {
-      return -1;
-    }
-    if (available() == 0) {
-      readBlock();
-    }
-    if (finished) {
-      return -1;
-    }
-    int value = buffer[bufferOffset++] & 0xFF;
-    
-    return value;
-  }
-
-  @Override
-  public int read(byte b[], int off, int len) throws IOException {
-    net.jpountz.util.Utils.checkRange(b, off, len);
-    if (finished) {
-      return -1;
-    }
-    if (available() == 0) {
-      readBlock();
-    }
-    if (finished) {
-      return -1;
-    }
-    len = Math.min(len, available());
-    System.arraycopy(buffer, bufferOffset, b, off, len);
-    bufferOffset += len;
-    return len;
-  }
-
-  @Override
-  public long skip(long n) throws IOException {
-    if (finished) {
-      return 0;
-    }
-    if (available() == 0) {
-      readBlock();
-    }
-    if (finished) {
-      return 0;
-    }
-    n = Math.min(n, available());
-    bufferOffset += n;
-    return n;
-  }
-
-  @Override
-  public int available() throws IOException {
-    return bufferSize - bufferOffset;
-  }
-
-  @Override
-  public void close() throws IOException {
-      in.close();
-  }
-
-  @Override
-  public synchronized void mark(int readlimit) {
-      throw new RuntimeException("mark not supported");
-  }
-
-  @Override
-  public synchronized void reset() throws IOException {
-      throw new RuntimeException("reset not supported");
-  }
-
-  @Override
-  public boolean markSupported() {
-      return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
 
b/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
deleted file mode 100644
index e5b9e43..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/message/KafkaLZ4BlockOutputStream.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.message;
-
-import java.io.FilterOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.kafka.common.utils.Utils;
-
-import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.xxhash.XXHash32;
-import net.jpountz.xxhash.XXHashFactory;
-
-/**
- * A partial implementation of the v1.4.1 LZ4 Frame format.
- * 
- * @see <a 
href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit";>LZ4
 Framing Format Spec</a>
- */
-public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
-
-  public static final int MAGIC = 0x184D2204;
-  public static final int LZ4_MAX_HEADER_LENGTH = 19;
-  public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
-  
-  public static final String CLOSED_STREAM = "The stream is already closed";
-  
-  public static final int BLOCKSIZE_64KB = 4;
-  public static final int BLOCKSIZE_256KB = 5;
-  public static final int BLOCKSIZE_1MB = 6;
-  public static final int BLOCKSIZE_4MB = 7;
-  
-  private final LZ4Compressor compressor;
-  private final XXHash32 checksum;
-  private final FLG flg;
-  private final BD bd;
-  private final byte[] buffer;
-  private final byte[] compressedBuffer;
-  private final int maxBlockSize;
-  private int bufferOffset;
-  private boolean finished;
-
-  /**
-   * Create a new {@link OutputStream} that will compress data using the LZ4 
algorithm.
-   *
-   * @param out The output stream to compress
-   * @param blockSize Default: 4. The block size used during compression. 
4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception
-   * @param blockChecksum Default: false. When true, a XXHash32 checksum is 
computed and appended to the stream for every block of data
-   * @throws IOException
-   */
-  public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean 
blockChecksum) throws IOException {
-    super(out);
-    compressor = LZ4Factory.fastestInstance().fastCompressor();
-    checksum = XXHashFactory.fastestInstance().hash32();
-    bd = new BD(blockSize);
-    flg = new FLG(blockChecksum);
-    bufferOffset = 0;
-    maxBlockSize = bd.getBlockMaximumSize();
-    buffer = new byte[maxBlockSize];
-    compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
-    finished = false;
-    writeHeader();
-  }
-  
-  /**
-   * Create a new {@link OutputStream} that will compress data using the LZ4 
algorithm.
-   *  
-   * @param out The stream to compress
-   * @param blockSize Default: 4. The block size used during compression. 
4=64kb, 5=256kb, 6=1mb, 7=4mb. All other values will generate an exception
-   * @throws IOException
-   */
-  public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws 
IOException {
-    this(out, blockSize, false);
-  }
-  
-  /**
-   * Create a new {@link OutputStream} that will compress data using the LZ4 
algorithm.
-   * 
-   * @param out The output stream to compress
-   * @throws IOException
-   */
-  public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
-    this(out, BLOCKSIZE_64KB);
-  }
-
-  /**
-   * Writes the magic number and frame descriptor to the underlying {@link 
OutputStream}.
-   *  
-   * @throws IOException
-   */
-  private void writeHeader() throws IOException {
-    Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
-    bufferOffset = 4;
-    buffer[bufferOffset++] = flg.toByte();
-    buffer[bufferOffset++] = bd.toByte();
-    // TODO write uncompressed content size, update flg.validate()
-    // TODO write dictionary id, update flg.validate()
-    // compute checksum on all descriptor fields 
-    int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
-    buffer[bufferOffset++] = (byte) hash;
-    // write out frame descriptor
-    out.write(buffer, 0, bufferOffset);
-    bufferOffset = 0;
-  }
-  
-  /**
-   * Compresses buffered data, optionally computes an XXHash32 checksum, and 
writes
-   * the result to the underlying {@link OutputStream}.
-   * 
-   * @throws IOException
-   */
-  private void writeBlock() throws IOException {
-    if (bufferOffset == 0) {
-      return;
-    }
-    
-    int compressedLength = compressor.compress(buffer, 0, bufferOffset, 
compressedBuffer, 0);
-    byte[] bufferToWrite = compressedBuffer;
-    int compressMethod = 0;
-    
-    // Store block uncompressed if compressed length is greater 
(incompressible)
-    if (compressedLength >= bufferOffset) {
-      bufferToWrite = buffer;
-      compressedLength = bufferOffset;
-      compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
-    }
-
-    // Write content
-    Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
-    out.write(bufferToWrite, 0, compressedLength);
-    
-    // Calculate and write block checksum
-    if (flg.isBlockChecksumSet()) {
-      int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
-      Utils.writeUnsignedIntLE(out, hash);
-    }
-    bufferOffset = 0;
-  }
-  
-  /**
-   * Similar to the {@link #writeBlock()} method.  Writes a 0-length block 
-   * (without block checksum) to signal the end of the block stream.
-   * 
-   * @throws IOException
-   */
-  private void writeEndMark() throws IOException {
-    Utils.writeUnsignedIntLE(out, 0);
-    // TODO implement content checksum, update flg.validate()
-    finished = true;
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    ensureNotFinished();
-    if (bufferOffset == maxBlockSize) {
-      writeBlock();
-    }
-    buffer[bufferOffset++] = (byte) b;
-  }
-  
-  @Override
-  public void write(byte[] b, int off, int len) throws IOException {
-    net.jpountz.util.Utils.checkRange(b, off, len);
-    ensureNotFinished();
-     
-    int bufferRemainingLength = maxBlockSize - bufferOffset;
-    // while b will fill the buffer
-    while (len > bufferRemainingLength) {
-      // fill remaining space in buffer 
-      System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
-      bufferOffset = maxBlockSize;
-      writeBlock();
-      // compute new offset and length
-      off += bufferRemainingLength;
-      len -= bufferRemainingLength;
-      bufferRemainingLength = maxBlockSize;
-    }
-    
-    System.arraycopy(b, off, buffer, bufferOffset, len);
-    bufferOffset += len;
-  }
-
-  @Override
-  public void flush() throws IOException {
-    if (!finished) {
-        writeBlock();
-    }
-    if (out != null) {
-      out.flush();
-    }
-  }
-
-  /**
-   * A simple state check to ensure the stream is still open.
-   */
-  private void ensureNotFinished() {
-    if (finished) {
-      throw new IllegalStateException(CLOSED_STREAM);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!finished) {
-      writeEndMark();
-      flush();
-      finished = true;
-    }
-    if (out != null) {
-      out.close();
-      out = null;
-    }
-  }
-
-  public static class FLG {
-    
-    private static final int VERSION = 1;
-    
-    private final int presetDictionary;
-    private final int reserved1;
-    private final int contentChecksum;
-    private final int contentSize;
-    private final int blockChecksum;
-    private final int blockIndependence;
-    private final int version;
-    
-    public FLG() {
-      this(false);
-    }
-    
-    public FLG(boolean blockChecksum) {
-      this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
-    }
-    
-    private FLG(int presetDictionary, int reserved1, int contentChecksum, 
-        int contentSize, int blockChecksum, int blockIndependence, int 
version) {
-      this.presetDictionary = presetDictionary;
-      this.reserved1 = reserved1;
-      this.contentChecksum = contentChecksum;
-      this.contentSize = contentSize;
-      this.blockChecksum = blockChecksum;
-      this.blockIndependence = blockIndependence;
-      this.version = version;
-      validate();
-    }
-    
-    public static FLG fromByte(byte flg) {
-      int presetDictionary =  (flg >>> 0) & 1;
-      int reserved1 =         (flg >>> 1) & 1;
-      int contentChecksum =   (flg >>> 2) & 1;
-      int contentSize =       (flg >>> 3) & 1;
-      int blockChecksum =     (flg >>> 4) & 1;
-      int blockIndependence = (flg >>> 5) & 1;
-      int version =           (flg >>> 6) & 3;
-      
-      return new FLG(presetDictionary, reserved1, contentChecksum, 
-          contentSize, blockChecksum, blockIndependence, version);
-    }
-    
-    public byte toByte() {
-      return (byte) (
-            ((presetDictionary   & 1) << 0)
-          | ((reserved1          & 1) << 1)
-          | ((contentChecksum    & 1) << 2)
-          | ((contentSize        & 1) << 3)
-          | ((blockChecksum      & 1) << 4)
-          | ((blockIndependence  & 1) << 5)
-          | ((version            & 3) << 6) );
-    }
-    
-    private void validate() {
-      if (presetDictionary != 0) {
-        throw new RuntimeException("Preset dictionary is unsupported");
-      }
-      if (reserved1 != 0) {
-        throw new RuntimeException("Reserved1 field must be 0");
-      }
-      if (contentChecksum != 0) {
-        throw new RuntimeException("Content checksum is unsupported");
-      }
-      if (contentSize != 0) {
-        throw new RuntimeException("Content size is unsupported");
-      }
-      if (blockIndependence != 1) {
-        throw new RuntimeException("Dependent block stream is unsupported");
-      }
-      if (version != VERSION) {
-        throw new RuntimeException(String.format("Version %d is unsupported", 
version));
-      }
-    }
-    
-    public boolean isPresetDictionarySet() {
-      return presetDictionary == 1;
-    }
-    
-    public boolean isContentChecksumSet() {
-      return contentChecksum == 1;
-    }
-    
-    public boolean isContentSizeSet() {
-      return contentSize == 1;
-    }
-    
-    public boolean isBlockChecksumSet() {
-      return blockChecksum == 1;
-    }
-    
-    public boolean isBlockIndependenceSet() {
-      return blockIndependence == 1;
-    }
-    
-    public int getVersion() {
-      return version;
-    }
-  }
-  
-  public static class BD {
-    
-    private final int reserved2;
-    private final int blockSizeValue;
-    private final int reserved3;
-    
-    public BD() {
-      this(0, BLOCKSIZE_64KB, 0);
-    }
-    
-    public BD(int blockSizeValue) {
-      this(0, blockSizeValue, 0);
-    }
-    
-    private BD(int reserved2, int blockSizeValue, int reserved3) {
-      this.reserved2 = reserved2;
-      this.blockSizeValue = blockSizeValue;
-      this.reserved3 = reserved3;
-      validate();
-    }
-    
-    public static BD fromByte(byte bd) {
-      int reserved2 =        (bd >>> 0) & 15;
-      int blockMaximumSize = (bd >>> 4) & 7;
-      int reserved3 =        (bd >>> 7) & 1;
-      
-      return new BD(reserved2, blockMaximumSize, reserved3);
-    }
-    
-    private void validate() {
-      if (reserved2 != 0) {
-        throw new RuntimeException("Reserved2 field must be 0");
-      }
-      if (blockSizeValue < 4 || blockSizeValue > 7) {
-        throw new RuntimeException("Block size value must be between 4 and 7");
-      }
-      if (reserved3 != 0) {
-        throw new RuntimeException("Reserved3 field must be 0");
-      }
-    }
-    
-    // 2^(2n+8)
-    public int getBlockMaximumSize() {
-      return (1 << ((2 * blockSizeValue) + 8));
-    }
-    
-    public byte toByte() {
-      return (byte) (
-            ((reserved2       & 15) << 0)
-          | ((blockSizeValue  & 7) << 4)
-          | ((reserved3       & 1) << 7) );
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 9c20538..6b9590c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
 public class JmxReporter implements MetricsReporter {
 
     private static final Logger log = 
LoggerFactory.getLogger(JmxReporter.class);
-    private static final Object lock = new Object();
+    private static final Object LOCK = new Object();
     private String prefix;
     private final Map<String, KafkaMbean> mbeans = new HashMap<String, 
KafkaMbean>();
 
@@ -58,12 +58,11 @@ public class JmxReporter implements MetricsReporter {
     }
 
     @Override
-    public void configure(Map<String, ?> configs) {
-    }
+    public void configure(Map<String, ?> configs) {}
 
     @Override
     public void init(List<KafkaMetric> metrics) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             for (KafkaMetric metric : metrics)
                 addAttribute(metric);
             for (KafkaMbean mbean : mbeans.values())
@@ -73,7 +72,7 @@ public class JmxReporter implements MetricsReporter {
 
     @Override
     public void metricChange(KafkaMetric metric) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             KafkaMbean mbean = addAttribute(metric);
             reregister(mbean);
         }
@@ -86,36 +85,35 @@ public class JmxReporter implements MetricsReporter {
             if (!this.mbeans.containsKey(mBeanName))
                 mbeans.put(mBeanName, new KafkaMbean(mBeanName));
             KafkaMbean mbean = this.mbeans.get(mBeanName);
-            mbean.setAttribute(metricName.name() , metric);
+            mbean.setAttribute(metricName.name(), metric);
             return mbean;
         } catch (JMException e) {
             throw new KafkaException("Error creating mbean attribute for 
metricName :" + metric.metricName(), e);
         }
     }
 
-  /**
-   * @param metricName
-   * @return standard JMX MBean name in the following format
-   *       domainName:type=metricType,key1=val1,key2=val2
-   */
-  private String getMBeanName(MetricName metricName) {
-    StringBuilder mBeanName = new StringBuilder();
-    mBeanName.append(prefix);
-    mBeanName.append(":type=");
-    mBeanName.append(metricName.group());
-    for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
-      if(entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
-         continue;
-      mBeanName.append(",");
-      mBeanName.append(entry.getKey());
-      mBeanName.append("=");
-      mBeanName.append(entry.getValue());
+    /**
+     * @param metricName
+     * @return standard JMX MBean name in the following format 
domainName:type=metricType,key1=val1,key2=val2
+     */
+    private String getMBeanName(MetricName metricName) {
+        StringBuilder mBeanName = new StringBuilder();
+        mBeanName.append(prefix);
+        mBeanName.append(":type=");
+        mBeanName.append(metricName.group());
+        for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
+            if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
+                continue;
+            mBeanName.append(",");
+            mBeanName.append(entry.getKey());
+            mBeanName.append("=");
+            mBeanName.append(entry.getValue());
+        }
+        return mBeanName.toString();
     }
-    return mBeanName.toString();
-  }
 
     public void close() {
-        synchronized (lock) {
+        synchronized (LOCK) {
             for (KafkaMbean mbean : this.mbeans.values())
                 unregister(mbean);
         }
@@ -185,7 +183,12 @@ public class JmxReporter implements MetricsReporter {
             for (Map.Entry<String, KafkaMetric> entry : 
this.metrics.entrySet()) {
                 String attribute = entry.getKey();
                 KafkaMetric metric = entry.getValue();
-                attrs[i] = new MBeanAttributeInfo(attribute, 
double.class.getName(), metric.metricName().description(), true, false, false);
+                attrs[i] = new MBeanAttributeInfo(attribute,
+                                                  double.class.getName(),
+                                                  
metric.metricName().description(),
+                                                  true,
+                                                  false,
+                                                  false);
                 i += 1;
             }
             return new MBeanInfo(this.getClass().getName(), "", attrs, null, 
null, null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index e53cfaa..ca823fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -147,7 +147,7 @@ public final class Sensor {
      * @param stat The statistic to keep
      */
     public void add(MetricName metricName, MeasurableStat stat) {
-      add(metricName, stat, null);
+        add(metricName, stat, null);
     }
 
     /**
@@ -157,11 +157,11 @@ public final class Sensor {
      * @param config A special configuration for this metric. If null use the 
sensor default configuration.
      */
     public synchronized void add(MetricName metricName, MeasurableStat stat, 
MetricConfig config) {
-      KafkaMetric metric = new KafkaMetric(new Object(),
-                                      Utils.notNull(metricName),
-                                      Utils.notNull(stat),
-                                      config == null ? this.config : config,
-                                      time);
+        KafkaMetric metric = new KafkaMetric(new Object(),
+                                             Utils.notNull(metricName),
+                                             Utils.notNull(stat),
+                                             config == null ? this.config : 
config,
+                                             time);
         this.registry.registerMetric(metric);
         this.metrics.add(metric);
         this.stats.add(stat);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index a5838b3..98429da 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -71,7 +71,7 @@ public class Rate implements MeasurableStat {
             case MILLISECONDS:
                 return time;
             case SECONDS:
-                return time / (1000.0);
+                return time / 1000.0;
             case MINUTES:
                 return time / (60.0 * 1000.0);
             case HOURS:

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index dcc639a..fc0d168 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -50,7 +50,7 @@ public class NetworkReceive implements Receive {
 
     @Override
     public ByteBuffer[] reify() {
-        return new ByteBuffer[] { this.buffer };
+        return new ByteBuffer[] {this.buffer};
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index e18a769..6baad93 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -14,7 +14,6 @@ package org.apache.kafka.common.network;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.CancelledKeyException;
@@ -275,7 +274,7 @@ public class Selector implements Selectable {
                     }
                 } catch (IOException e) {
                     String desc = socketDescription(channel);
-                    if(e instanceof EOFException)
+                    if (e instanceof EOFException)
                         log.info("Connection {} disconnected", desc);
                     else
                         log.warn("Error in I/O with connection to {}", desc, 
e);
@@ -290,9 +289,9 @@ public class Selector implements Selectable {
     
     private String socketDescription(SocketChannel channel) {
         Socket socket = channel.socket();
-        if(socket == null)
+        if (socket == null)
             return "[unconnected socket]";
-        else if(socket.getInetAddress() != null)
+        else if (socket.getInetAddress() != null)
             return socket.getInetAddress().toString();
         else
             return socket.getLocalAddress().toString();
@@ -525,7 +524,7 @@ public class Selector implements Selectable {
                     String metricGrpName = metricGrpPrefix + "-node-metrics";
 
                     Map<String, String> tags = new LinkedHashMap<String, 
String>(metricTags);
-                    tags.put("node-id", "node-"+node);
+                    tags.put("node-id", "node-" + node);
 
                     nodeRequest = this.metrics.sensor(nodeRequestName);
                     MetricName metricName = new 
MetricName("outgoing-byte-rate", metricGrpName, tags);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 109fc96..07aba71 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -16,10 +16,6 @@
  */
 package org.apache.kafka.common.protocol;
 
-
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Identifiers for all the Kafka APIs
  */
@@ -37,16 +33,18 @@ public enum ApiKeys {
     HEARTBEAT(12, "heartbeat");
 
     private static ApiKeys[] codeToType;
-    public static int MAX_API_KEY = -1;
+    public static final int MAX_API_KEY;
 
     static {
+        int maxKey = -1;
         for (ApiKeys key : ApiKeys.values()) {
-            MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
+            maxKey = Math.max(maxKey, key.id);
         }
-        codeToType = new ApiKeys[MAX_API_KEY+1];
+        codeToType = new ApiKeys[maxKey + 1];
         for (ApiKeys key : ApiKeys.values()) {
             codeToType[key.id] = key;
         }
+        MAX_API_KEY = maxKey;
     }
 
     /** the perminant and immutable id of an API--this can't change ever */

Reply via email to