Repository: kafka Updated Branches: refs/heads/trunk 6dc974312 -> a071e3554
KAFKA-2993: Calculate compression rate at close() call Buffer is rewound before the compression rate metric is updated which results in 0 compress rate. The fix is to include a new compressRate field to record the latest compression rate in ```close()``` and return it to sensor Author: Xiao, Tao <[email protected]> Reviewers: Guozhang Wang Closes #784 from xiaotao183/kafka-2993 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a071e355 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a071e355 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a071e355 Branch: refs/heads/trunk Commit: a071e3554bf626182557dfaca491afebe9c17020 Parents: 6dc9743 Author: Tao Xiao <[email protected]> Authored: Sun Jan 17 17:14:07 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Sun Jan 17 17:14:07 2016 -0800 ---------------------------------------------------------------------- .../java/org/apache/kafka/common/record/Compressor.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a071e355/clients/src/main/java/org/apache/kafka/common/record/Compressor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java index 1aee389..c7ff2e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java @@ -88,6 +88,7 @@ public class Compressor { public long writtenUncompressed; public long numRecords; + public float compressionRate; public Compressor(ByteBuffer buffer, CompressionType type, int blockSize) { this.type = type; @@ -95,6 +96,7 @@ public class Compressor { this.numRecords = 0; this.writtenUncompressed = 0; + this.compressionRate = 1; if (type != CompressionType.NONE) { // for compressed records, leave space for the header and the shallow message metadata @@ -116,11 +118,7 @@ public class Compressor { } public double compressionRate() { - ByteBuffer buffer = bufferStream.buffer(); - if (this.writtenUncompressed == 0) - return 1.0; - else - return (double) buffer.position() / this.writtenUncompressed; + return compressionRate; } public void close() { @@ -151,7 +149,7 @@ public class Compressor { buffer.position(pos); // update the compression ratio - float compressionRate = (float) buffer.position() / this.writtenUncompressed; + this.compressionRate = (float) buffer.position() / this.writtenUncompressed; TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR + compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); }
