Repository: kafka
Updated Branches:
  refs/heads/trunk a86873be5 -> 177dd7f21


KAFKA-5746; Fix conversion count computed in `downConvert`

It should be the number of records instead of the
number of batches.

A few additional clean-ups:
- Minor renames
- Removed unused variable
- Some test fixes
- Ignore a flaky test

Author: Ismael Juma <[email protected]>

Reviewers: Rajini Sivaram <[email protected]>, tedyu 
<[email protected]>

Closes #3989 from ijuma/kafka-5746-health-metrics-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/177dd7f2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/177dd7f2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/177dd7f2

Branch: refs/heads/trunk
Commit: 177dd7f210c7783b49f47de1c7028c9830d3d795
Parents: a86873b
Author: Ismael Juma <[email protected]>
Authored: Fri Sep 29 09:53:32 2017 +0100
Committer: Rajini Sivaram <[email protected]>
Committed: Fri Sep 29 09:53:32 2017 +0100

----------------------------------------------------------------------
 .../kafka/common/record/AbstractRecords.java    | 11 ++++--
 .../common/record/RecordsProcessingStats.java   | 16 ++++-----
 .../common/record/MemoryRecordsBuilderTest.java | 28 +++++++++++-----
 .../src/main/scala/kafka/server/KafkaApis.scala |  3 +-
 .../integration/kafka/api/MetricsTest.scala     |  4 +--
 .../scala/unit/kafka/log/LogValidatorTest.scala | 35 ++++++++++++--------
 6 files changed, 59 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 6a1b95c..2452798 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -59,6 +59,11 @@ public abstract class AbstractRecords implements Records {
      * If a client requests records in v1 format starting from the middle of 
an uncompressed batch in v2 format, we
      * need to drop records from the batch during the conversion. Some 
versions of librdkafka rely on this for
      * correctness.
+     *
+     * The temporaryMemoryBytes computation assumes that the batches are not 
loaded into the heap
+     * (via classes like FileChannelRecordBatch) before this method is called. 
This is the case in the broker (we
+     * only load records into the heap when down converting), but it's not for 
the producer. However, down converting
+     * in the producer is very uncommon and the extra complexity to handle 
that case is not worth it.
      */
     protected ConvertedRecords<MemoryRecords> downConvert(Iterable<? extends 
RecordBatch> batches, byte toMagic,
             long firstOffset, Time time) {
@@ -95,7 +100,7 @@ public abstract class AbstractRecords implements Records {
 
         ByteBuffer buffer = ByteBuffer.allocate(totalSizeEstimate);
         long temporaryMemoryBytes = 0;
-        long conversionCount = 0;
+        int numRecordsConverted = 0;
         for (RecordBatchAndRecords recordBatchAndRecords : 
recordBatchAndRecordsList) {
             temporaryMemoryBytes += recordBatchAndRecords.batch.sizeInBytes();
             if (recordBatchAndRecords.batch.magic() <= toMagic) {
@@ -104,12 +109,12 @@ public abstract class AbstractRecords implements Records {
                 MemoryRecordsBuilder builder = convertRecordBatch(toMagic, 
buffer, recordBatchAndRecords);
                 buffer = builder.buffer();
                 temporaryMemoryBytes += builder.uncompressedBytesWritten();
-                conversionCount++;
+                numRecordsConverted += builder.numRecords();
             }
         }
 
         buffer.flip();
-        RecordsProcessingStats stats = new 
RecordsProcessingStats(temporaryMemoryBytes, conversionCount,
+        RecordsProcessingStats stats = new 
RecordsProcessingStats(temporaryMemoryBytes, numRecordsConverted,
                 time.nanoseconds() - startNanos);
         return new ConvertedRecords<>(MemoryRecords.readableRecords(buffer), 
stats);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
 
b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
index cbb76b7..e104bc8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/RecordsProcessingStats.java
@@ -18,15 +18,15 @@ package org.apache.kafka.common.record;
 
 public class RecordsProcessingStats {
 
-    public static final RecordsProcessingStats EMPTY = new 
RecordsProcessingStats(0L, 0L, -1);
+    public static final RecordsProcessingStats EMPTY = new 
RecordsProcessingStats(0L, 0, -1);
 
     private final long temporaryMemoryBytes;
-    private final long conversionCount;
+    private final int numRecordsConverted;
     private final long conversionTimeNanos;
 
-    public RecordsProcessingStats(long temporaryMemoryBytes, long 
conversionCount, long conversionTimeNanos) {
+    public RecordsProcessingStats(long temporaryMemoryBytes, int 
numRecordsConverted, long conversionTimeNanos) {
         this.temporaryMemoryBytes = temporaryMemoryBytes;
-        this.conversionCount = conversionCount;
+        this.numRecordsConverted = numRecordsConverted;
         this.conversionTimeNanos = conversionTimeNanos;
     }
 
@@ -44,8 +44,8 @@ public class RecordsProcessingStats {
         return temporaryMemoryBytes;
     }
 
-    public long conversionCount() {
-        return conversionCount;
+    public int numRecordsConverted() {
+        return numRecordsConverted;
     }
 
     public long conversionTimeNanos() {
@@ -54,7 +54,7 @@ public class RecordsProcessingStats {
 
     @Override
     public String toString() {
-        return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, 
conversionCount=%d, conversionTimeNanos=%d)",
-                temporaryMemoryBytes, conversionCount, conversionTimeNanos);
+        return String.format("RecordsProcessingStats(temporaryMemoryBytes=%d, 
numRecordsConverted=%d, conversionTimeNanos=%d)",
+                temporaryMemoryBytes, numRecordsConverted, 
conversionTimeNanos);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 4df3492..15cc508 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -446,15 +446,21 @@ public class MemoryRecordsBuilderTest {
         builder.append(10L, "1".getBytes(), "a".getBytes());
         builder.close();
 
+        int sizeExcludingTxnMarkers = buffer.position();
+
         MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 
System.currentTimeMillis(), 0, 15L, (short) 0,
                 new EndTransactionMarker(ControlRecordType.ABORT, 0));
 
+        int position = buffer.position();
+
         builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
compressionType,
                 TimestampType.CREATE_TIME, 1L);
         builder.append(12L, "2".getBytes(), "b".getBytes());
         builder.append(13L, "3".getBytes(), "c".getBytes());
         builder.close();
 
+        sizeExcludingTxnMarkers += buffer.position() - position;
+
         MemoryRecords.writeEndTransactionalMarker(buffer, 14L, 
System.currentTimeMillis(), 0, 1L, (short) 0,
                 new EndTransactionMarker(ControlRecordType.COMMIT, 0));
 
@@ -463,8 +469,10 @@ public class MemoryRecordsBuilderTest {
         ConvertedRecords<MemoryRecords> convertedRecords = 
MemoryRecords.readableRecords(buffer)
                 .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
         MemoryRecords records = convertedRecords.records();
+
+        // Transactional markers are skipped when down converting to V1, so 
exclude them from size
         verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(),
-                3, 2, records.sizeInBytes(), buffer.limit());
+                3, 3, records.sizeInBytes(), sizeExcludingTxnMarkers);
 
         List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -504,7 +512,7 @@ public class MemoryRecordsBuilderTest {
         ConvertedRecords<MemoryRecords> convertedRecords = 
MemoryRecords.readableRecords(buffer)
                 .downConvert(RecordBatch.MAGIC_VALUE_V1, 0, time);
         MemoryRecords records = convertedRecords.records();
-        
verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+        
verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2,
                 records.sizeInBytes(), buffer.limit());
 
         List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
@@ -531,8 +539,6 @@ public class MemoryRecordsBuilderTest {
 
         convertedRecords = 
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 
2L, time);
         records = convertedRecords.records();
-        
verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
-                records.sizeInBytes(), buffer.limit());
 
         batches = Utils.toList(records.batches().iterator());
         logRecords = Utils.toList(records.records().iterator());
@@ -546,6 +552,8 @@ public class MemoryRecordsBuilderTest {
             assertEquals("1", utf8(logRecords.get(0).key()));
             assertEquals("2", utf8(logRecords.get(1).key()));
             assertEquals("3", utf8(logRecords.get(2).key()));
+            
verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 2,
+                    records.sizeInBytes(), buffer.limit());
         } else {
             assertEquals(2, batches.size());
             assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
@@ -554,6 +562,8 @@ public class MemoryRecordsBuilderTest {
             assertEquals(2, batches.get(1).baseOffset());
             assertEquals("1", utf8(logRecords.get(0).key()));
             assertEquals("3", utf8(logRecords.get(1).key()));
+            
verifyRecordsProcessingStats(convertedRecords.recordsProcessingStats(), 3, 1,
+                    records.sizeInBytes(), buffer.limit());
         }
     }
 
@@ -634,16 +644,16 @@ public class MemoryRecordsBuilderTest {
         return values;
     }
 
-    private void verifyRecordsProcessingStats(RecordsProcessingStats 
processingStats, int recordCount, int convertedCount,
-            long finalBytes, long preConvertedBytes) {
+    private void verifyRecordsProcessingStats(RecordsProcessingStats 
processingStats, int numRecords,
+            int numRecordsConverted, long finalBytes, long preConvertedBytes) {
         assertNotNull("Records processing info is null", processingStats);
-        assertEquals(convertedCount, processingStats.conversionCount());
+        assertEquals(numRecordsConverted, 
processingStats.numRecordsConverted());
         assertTrue("Processing time not recorded", 
processingStats.conversionTimeNanos() > 0);
         long tempBytes = processingStats.temporaryMemoryBytes();
         if (compressionType == CompressionType.NONE) {
-            if (convertedCount == 0)
+            if (numRecordsConverted == 0)
                 assertEquals(finalBytes, tempBytes);
-            else if (convertedCount == recordCount)
+            else if (numRecordsConverted == numRecords)
                 assertEquals(preConvertedBytes + finalBytes, tempBytes);
             else {
                 assertTrue(String.format("Unexpected temp bytes %d final %d 
pre %d", tempBytes, finalBytes, preConvertedBytes),

http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6781dc9..6e02164 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -518,7 +518,6 @@ class KafkaApis(val requestChannel: RequestChannel,
 
         downConvertMagic.map { magic =>
           trace(s"Down converting records from partition $tp to message format 
version $magic for fetch request from $clientId")
-          val startNanos = time.nanoseconds
           val converted = data.records.downConvert(magic, 
fetchRequest.fetchData.get(tp).fetchOffset, time)
           updateRecordsProcessingStats(request, tp, 
converted.recordsProcessingStats)
           new FetchResponse.PartitionData(data.error, data.highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
@@ -2013,7 +2012,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   private def updateRecordsProcessingStats(request: RequestChannel.Request, 
tp: TopicPartition,
                                            processingStats: 
RecordsProcessingStats): Unit = {
-    val conversionCount = processingStats.conversionCount
+    val conversionCount = processingStats.numRecordsConverted
     if (conversionCount > 0) {
       request.header.apiKey match {
         case ApiKeys.PRODUCE =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index f71a36a..0bd5075 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -18,7 +18,6 @@ import kafka.log.LogConfig
 import kafka.network.RequestMetrics
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{JaasTestUtils, TestUtils}
-
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.{Gauge, Histogram, Meter}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
@@ -27,7 +26,7 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{After, Before, Test}
+import org.junit.{After, Before, Ignore, Test}
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
@@ -67,6 +66,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
    * Verifies some of the metrics of producer, consumer as well as server.
    */
   @Test
+  @Ignore
   def testMetrics(): Unit = {
     val topic = "topicWithOldMessageFormat"
     val props = new Properties

http://git-wip-us.apache.org/repos/asf/kafka/blob/177dd7f2/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala 
b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
index b64371e..4d8365b 100644
--- a/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogValidatorTest.scala
@@ -61,7 +61,8 @@ class LogValidatorTest {
     assertEquals(s"The offset of max timestamp should be 0", 0, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, 
records, compressed = false)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 
numConvertedRecords = 0, records,
+      compressed = false)
   }
 
   def testLogAppendTimeNonCompressedV2() {
@@ -101,7 +102,7 @@ class LogValidatorTest {
     assertTrue("Message size may have been changed", 
validatedResults.messageSizeMaybeChanged)
 
     val stats = validatedResults.recordsProcessingStats
-    verifyRecordsProcessingStats(stats, 3, records, compressed = true)
+    verifyRecordsProcessingStats(stats, numConvertedRecords = 3, records, 
compressed = true)
   }
 
   @Test
@@ -142,7 +143,8 @@ class LogValidatorTest {
       records.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, 
records, compressed = true)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 
numConvertedRecords = 0, records,
+      compressed = true)
   }
 
   @Test
@@ -207,7 +209,8 @@ class LogValidatorTest {
     assertEquals(s"Offset of max timestamp should be 1", 1, 
validatingResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatingResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 0, 
records, compressed = false)
+    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 
numConvertedRecords = 0, records,
+      compressed = false)
   }
 
   @Test
@@ -271,7 +274,8 @@ class LogValidatorTest {
     assertEquals("Offset of max timestamp should be 2", 2, 
validatingResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", 
validatingResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 3, 
records, compressed = true)
+    verifyRecordsProcessingStats(validatingResults.recordsProcessingStats, 
numConvertedRecords = 3, records,
+      compressed = true)
   }
 
   @Test
@@ -314,7 +318,8 @@ class LogValidatorTest {
       validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", 
validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, 
records, compressed = true)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 
numConvertedRecords = 3, records,
+      compressed = true)
   }
 
   @Test
@@ -354,7 +359,8 @@ class LogValidatorTest {
       validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertTrue("Message size should have been changed", 
validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 3, 
records, compressed = true)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 
numConvertedRecords = 3, records,
+      compressed = true)
   }
 
   @Test
@@ -414,7 +420,8 @@ class LogValidatorTest {
       validatedRecords.records.asScala.size - 1, 
validatedResults.shallowOffsetOfMaxTimestamp)
     assertFalse("Message size should not have been changed", 
validatedResults.messageSizeMaybeChanged)
 
-    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 0, 
records, compressed = true)
+    verifyRecordsProcessingStats(validatedResults.recordsProcessingStats, 
numConvertedRecords = 0, records,
+      compressed = true)
   }
 
   @Test
@@ -1066,17 +1073,17 @@ class LogValidatorTest {
     }
   }
 
-  def verifyRecordsProcessingStats(stats: RecordsProcessingStats, 
convertedCount: Int,
-            records: MemoryRecords, compressed: Boolean): Unit = {
+  def verifyRecordsProcessingStats(stats: RecordsProcessingStats, 
numConvertedRecords: Int, records: MemoryRecords,
+                                   compressed: Boolean): Unit = {
     assertNotNull("Records processing info is null", stats)
-    assertEquals(convertedCount, stats.conversionCount)
-    if (stats.conversionCount > 0)
+    assertEquals(numConvertedRecords, stats.numRecordsConverted)
+    if (numConvertedRecords > 0)
       assertTrue(s"Conversion time not recorded $stats", 
stats.conversionTimeNanos > 0)
     val originalSize = records.sizeInBytes
     val tempBytes = stats.temporaryMemoryBytes
-    if (convertedCount > 0)
+    if (numConvertedRecords > 0 && compressed)
       assertTrue(s"Temp bytes too small, orig=$originalSize 
actual=$tempBytes", tempBytes > originalSize)
-    else if (compressed)
+    else if (numConvertedRecords > 0 || compressed)
       assertTrue("Temp bytes not updated", tempBytes > 0)
     else
       assertEquals(0, tempBytes)

Reply via email to