Repository: kafka Updated Branches: refs/heads/trunk a86873be5 -> 177dd7f21
KAFKA-5746; Fix conversion count computed in `downConvert` It should be the number of records instead of the number of batches. A few additional clean-ups: - Minor renames - Removed unused variable - Some test fixes - Ignore a flaky test Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]>, tedyu <[email protected]> Closes #3989 from ijuma/kafka-5746-health-metrics-follow-up Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/177dd7f2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/177dd7f2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/177dd7f2 Branch: refs/heads/trunk Commit: 177dd7f210c7783b49f47de1c7028c9830d3d795 Parents: a86873b Author: Ismael Juma <[email protected]> Authored: Fri Sep 29 09:53:32 2017 +0100 Committer: Rajini Sivaram <[email protected]> Committed: Fri Sep 29 09:53:32 2017 +0100 ---------------------------------------------------------------------- .../kafka/common/record/AbstractRecords.java | 11 ++++-- .../common/record/RecordsProcessingStats.java | 16 ++++----- .../common/record/MemoryRecordsBuilderTest.java | 28 +++++++++++----- .../src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../integration/kafka/api/MetricsTest.scala | 4 +-- .../scala/unit/kafka/log/LogValidatorTest.scala | 35 ++++++++++++-------- 6 files changed, 59 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 6a1b95c..2452798 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -59,6 +59,11 @@ public abstract class AbstractRecords implements Records { * If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we * need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for * correctness. + * + * The temporaryMemoryBytes computation assumes that the batches are not loaded into the heap + * (via classes like FileChannelRecordBatch) before this method is called. This is the case in the broker (we + * only load records into the heap when down converting), but it's not for the producer. However, down converting + * in the producer is very uncommon and the extra complexity to handle that case is not worth it. */ protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends RecordBatch> batches, byte toMagic, long firstOffset, Time time) { @@ -95,7 +100,7 @@ public abstract class AbstractRecords implements Records { ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate); long temporaryMemoryBytes = 0; - long conversionCount = 0; + int numRecordsConverted = 0; for (RecordBatchAndRecords recordBatchAndRecords : recordBatchAndRecordsList) { temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes(); if (recordBatchAndRecords.batch.magic() <= toMagic) { @@ -104,12 +109,12 @@ public abstract class AbstractRecords implements Records { MemoryRecordsBuilder builder = convertRecordBatch(toMagic, buffer, recordBatchAndRecords); buffer = builder.buffer(); temporaryMemoryBytes += builder.uncompressedBytesWritten(); - conversionCount++; + numRecordsConverted += builder.numRecords(); } } buffer.flip(); - RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, conversionCount, + RecordsProcessingStats stats = new RecordsProcessingStats(temporaryMemoryBytes, numRecordsConverted, time.nanoseconds() - startNanos); return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), stats); } http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java index cbb76b7..e104bc8 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java @@ -18,15 +18,15 @@ package org.apache.kafka.common.record; public class RecordsProcessingStats { - public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0L, -1); + public static final RecordsProcessingStats EMPTY = new RecordsProcessingStats(0L, 0, -1); private final long temporaryMemoryBytes; - private final long conversionCount; + private final int numRecordsConverted; private final long conversionTimeNanos; - public RecordsProcessingStats(long temporaryMemoryBytes, long conversionCount, long conversionTimeNanos) { + public RecordsProcessingStats(long temporaryMemoryBytes, int numRecordsConverted, long conversionTimeNanos) { this.temporaryMemoryBytes = temporaryMemoryBytes; - this.conversionCount = conversionCount; + this.numRecordsConverted = numRecordsConverted; this.conversionTimeNanos = conversionTimeNanos; } @@ -44,8 +44,8 @@ public class RecordsProcessingStats { return temporaryMemoryBytes; } - public long conversionCount() { - return conversionCount; + public int numRecordsConverted() { + return numRecordsConverted; } public long conversionTimeNanos() { @@ -54,7 +54,7 @@ public class RecordsProcessingStats { @Override public String toString() { - return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, conversionCount=%d, conversionTimeNanos=%d)", - temporaryMemoryBytes, conversionCount, conversionTimeNanos); + return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, numRecordsConverted=%d, conversionTimeNanos=%d)", + temporaryMemoryBytes, numRecordsConverted, conversionTimeNanos); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 4df3492..15cc508 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -446,15 +446,21 @@ public class MemoryRecordsBuilderTest { builder.append(10L, "1".getBytes(), "a".getBytes()); builder.close(); + int sizeExcludingTxnMarkers = buffer.position(); + MemoryRecords.writeEndTransactionalMarker(buffer, 1L, System.currentTimeMillis(), 0, 15L, (short) 0, new EndTransactionMarker(ControlRecordType.ABORT, 0)); + int position = buffer.position(); + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, TimestampType.CREATE_TIME, 1L); builder.append(12L, "2".getBytes(), "b".getBytes()); builder.append(13L, "3".getBytes(), "c".getBytes()); builder.close(); + sizeExcludingTxnMarkers += buffer.position() - position; + MemoryRecords.writeEndTransactionalMarker(buffer, 14L, System.currentTimeMillis(), 0, 1L, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0)); @@ -463,8 +469,10 @@ public class MemoryRecordsBuilderTest { ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer) .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time); MemoryRecords records = convertedRecords.records(); + + // Transactional markers are skipped when down converting to V1, so exclude them from size verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), - 3, 2, records.sizeInBytes(), buffer.limit()); + 3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers); List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator()); if (compressionType != CompressionType.NONE) { @@ -504,7 +512,7 @@ public class MemoryRecordsBuilderTest { ConvertedRecords<MemoryRecords> convertedRecords = MemoryRecords.readableRecords(buffer) .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time); MemoryRecords records = convertedRecords.records(); - verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1, + verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2, records.sizeInBytes(), buffer.limit()); List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator()); @@ -531,8 +539,6 @@ public class MemoryRecordsBuilderTest { convertedRecords = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L, time); records = convertedRecords.records(); - verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1, - records.sizeInBytes(), buffer.limit()); batches = Utils.toList(records.batches().iterator()); logRecords = Utils.toList(records.records().iterator()); @@ -546,6 +552,8 @@ public class MemoryRecordsBuilderTest { assertEquals("1", utf8(logRecords.get(0).key())); assertEquals("2", utf8(logRecords.get(1).key())); assertEquals("3", utf8(logRecords.get(2).key())); + verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2, + records.sizeInBytes(), buffer.limit()); } else { assertEquals(2, batches.size()); assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic()); @@ -554,6 +562,8 @@ public class MemoryRecordsBuilderTest { assertEquals(2, batches.get(1).baseOffset()); assertEquals("1", utf8(logRecords.get(0).key())); assertEquals("3", utf8(logRecords.get(1).key())); + verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1, + records.sizeInBytes(), buffer.limit()); } } @@ -634,16 +644,16 @@ public class MemoryRecordsBuilderTest { return values; } - private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int recordCount, int convertedCount, - long finalBytes, long preConvertedBytes) { + private void verifyRecordsProcessingStats(RecordsProcessingStats processingStats, int numRecords, + int numRecordsConverted, long finalBytes, long preConvertedBytes) { assertNotNull("Records processing info is null", processingStats); - assertEquals(convertedCount, processingStats.conversionCount()); + assertEquals(numRecordsConverted, processingStats.numRecordsConverted()); assertTrue("Processing time not recorded", processingStats.conversionTimeNanos() > 0); long tempBytes = processingStats.temporaryMemoryBytes(); if (compressionType == CompressionType.NONE) { - if (convertedCount == 0) + if (numRecordsConverted == 0) assertEquals(finalBytes, tempBytes); - else if (convertedCount == recordCount) + else if (numRecordsConverted == numRecords) assertEquals(preConvertedBytes + finalBytes, tempBytes); else { assertTrue(String.format("Unexpected temp bytes %d final %d pre %d", tempBytes, finalBytes, preConvertedBytes), http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6781dc9..6e02164 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -518,7 +518,6 @@ class KafkaApis(val requestChannel: RequestChannel, downConvertMagic.map { magic => trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") - val startNanos = time.nanoseconds val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset, time) updateRecordsProcessingStats(request, tp, converted.recordsProcessingStats) new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, @@ -2013,7 +2012,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def updateRecordsProcessingStats(request: RequestChannel.Request, tp: TopicPartition, processingStats: RecordsProcessingStats): Unit = { - val conversionCount = processingStats.conversionCount + val conversionCount = processingStats.numRecordsConverted if (conversionCount > 0) { request.header.apiKey match { case ApiKeys.PRODUCE => http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/core/src/test/scala/integration/kafka/api/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index f71a36a..0bd5075 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -18,7 +18,6 @@ import kafka.log.LogConfig import kafka.network.RequestMetrics import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{JaasTestUtils, TestUtils} - import com.yammer.metrics.Metrics import com.yammer.metrics.core.{Gauge, Histogram, Meter} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} @@ -27,7 +26,7 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.errors.InvalidTopicException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{After, Before, Test} +import org.junit.{After, Before, Ignore, Test} import org.junit.Assert._ import scala.collection.JavaConverters._ @@ -67,6 +66,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { * Verifies some of the metrics of producer, consumer as well as server. */ @Test + @Ignore def testMetrics(): Unit = { val topic = "topicWithOldMessageFormat" val props = new Properties http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala index b64371e..4d8365b 100644 --- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala @@ -61,7 +61,8 @@ class LogValidatorTest { assertEquals(s"The offset of max timestamp should be 0", 0, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = false) + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records, + compressed = false) } def testLogAppendTimeNonCompressedV2() { @@ -101,7 +102,7 @@ class LogValidatorTest { assertTrue("Message size may have been changed", validatedResults.messageSizeMaybeChanged) val stats = validatedResults.recordsProcessingStats - verifyRecordsProcessingStats(stats, 3, records, compressed = true) + verifyRecordsProcessingStats(stats, numConvertedRecords = 3, records, compressed = true) } @Test @@ -142,7 +143,8 @@ class LogValidatorTest { records.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true) + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records, + compressed = true) } @Test @@ -207,7 +209,8 @@ class LogValidatorTest { assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 0, records, compressed = false) + verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, numConvertedRecords = 0, records, + compressed = false) } @Test @@ -271,7 +274,8 @@ class LogValidatorTest { assertEquals("Offset of max timestamp should be 2", 2, validatingResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size should have been changed", validatingResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 3, records, compressed = true) + verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, numConvertedRecords = 3, records, + compressed = true) } @Test @@ -314,7 +318,8 @@ class LogValidatorTest { validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true) + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records, + compressed = true) } @Test @@ -354,7 +359,8 @@ class LogValidatorTest { validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertTrue("Message size should have been changed", validatedResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, records, compressed = true) + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 3, records, + compressed = true) } @Test @@ -414,7 +420,8 @@ class LogValidatorTest { validatedRecords.records.asScala.size - 1, validatedResults.shallowOffsetOfMaxTimestamp) assertFalse("Message size should not have been changed", validatedResults.messageSizeMaybeChanged) - verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, records, compressed = true) + verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, numConvertedRecords = 0, records, + compressed = true) } @Test @@ -1066,17 +1073,17 @@ class LogValidatorTest { } } - def verifyRecordsProcessingStats(stats: RecordsProcessingStats, convertedCount: Int, - records: MemoryRecords, compressed: Boolean): Unit = { + def verifyRecordsProcessingStats(stats: RecordsProcessingStats, numConvertedRecords: Int, records: MemoryRecords, + compressed: Boolean): Unit = { assertNotNull("Records processing info is null", stats) - assertEquals(convertedCount, stats.conversionCount) - if (stats.conversionCount > 0) + assertEquals(numConvertedRecords, stats.numRecordsConverted) + if (numConvertedRecords > 0) assertTrue(s"Conversion time not recorded $stats", stats.conversionTimeNanos > 0) val originalSize = records.sizeInBytes val tempBytes = stats.temporaryMemoryBytes - if (convertedCount > 0) + if (numConvertedRecords > 0 && compressed) assertTrue(s"Temp bytes too small, orig=$originalSize actual=$tempBytes", tempBytes > originalSize) - else if (compressed) + else if (numConvertedRecords > 0 || compressed) assertTrue("Temp bytes not updated", tempBytes > 0) else assertEquals(0, tempBytes)
