Repository: kafka
Updated Branches:
  refs/heads/trunk ce1cb329d -> 6d6c77a7a


http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala 
b/core/src/main/scala/kafka/log/LogSegment.scala
index eddb47a..8854c3a 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -312,7 +312,7 @@ class LogSegment(val log: FileRecords,
     if (ms == null) {
       baseOffset
     } else {
-      ms.records.shallowEntries.asScala.toSeq.lastOption match {
+      ms.records.shallowEntries.asScala.lastOption match {
         case None => baseOffset
         case Some(last) => last.nextOffset
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala 
b/core/src/main/scala/kafka/log/LogValidator.scala
index d99c2ad..224a792 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -64,76 +64,8 @@ private[kafka] object LogValidator {
         assignOffsetsNonCompressed(records, offsetCounter, now, 
compactedTopic, messageTimestampType,
           messageTimestampDiffMaxMs)
     } else {
-      // Deal with compressed messages
-      // We cannot do in place assignment in one of the following situations:
-      // 1. Source and target compression codec are different
-      // 2. When magic value to use is 0 because offsets need to be overwritten
-      // 3. When magic value to use is above 0, but some fields of inner 
messages need to be overwritten.
-      // 4. Message format conversion is needed.
-
-      // No in place assignment situation 1 and 2
-      var inPlaceAssignment = sourceCodec == targetCodec && 
messageFormatVersion > Record.MAGIC_VALUE_V0
-
-      var maxTimestamp = Record.NO_TIMESTAMP
-      val expectedInnerOffset = new LongRef(0)
-      val validatedRecords = new mutable.ArrayBuffer[Record]
-
-      records.deepEntries(true).asScala.foreach { logEntry =>
-        val record = logEntry.record
-        validateKey(record, compactedTopic)
-
-        if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > 
Record.MAGIC_VALUE_V0) {
-          // No in place assignment situation 3
-          // Validate the timestamp
-          validateTimestamp(record, now, messageTimestampType, 
messageTimestampDiffMaxMs)
-          // Check if we need to overwrite offset
-          if (logEntry.offset != expectedInnerOffset.getAndIncrement())
-            inPlaceAssignment = false
-          if (record.timestamp > maxTimestamp)
-            maxTimestamp = record.timestamp
-        }
-
-        if (sourceCodec != NoCompressionCodec && logEntry.isCompressed)
-          throw new InvalidMessageException("Compressed outer record should 
not have an inner record with a " +
-            s"compression attribute set: $record")
-
-        // No in place assignment situation 4
-        if (record.magic != messageFormatVersion)
-          inPlaceAssignment = false
-
-        validatedRecords += record.convert(messageFormatVersion)
-      }
-
-      if (!inPlaceAssignment) {
-        val entries = validatedRecords.map(record => 
LogEntry.create(offsetCounter.getAndIncrement(), record))
-        val builder = MemoryRecords.builderWithEntries(messageTimestampType, 
CompressionType.forId(targetCodec.codec),
-          now, entries.asJava)
-        builder.close()
-        val info = builder.info
-
-        ValidationAndOffsetAssignResult(
-          validatedRecords = builder.build(),
-          maxTimestamp = info.maxTimestamp,
-          shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
-          messageSizeMaybeChanged = true)
-      } else {
-        // ensure the inner messages are valid
-        validatedRecords.foreach(_.ensureValid)
-
-        // we can update the wrapper message only and write the compressed 
payload as is
-        val entry = records.shallowEntries.iterator.next()
-        val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
-        entry.setOffset(offset)
-        if (messageTimestampType == TimestampType.CREATE_TIME)
-          entry.setCreateTime(maxTimestamp)
-        else if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
-          entry.setLogAppendTime(now)
-
-        ValidationAndOffsetAssignResult(validatedRecords = records,
-          maxTimestamp = if (messageTimestampType == 
TimestampType.LOG_APPEND_TIME) now else maxTimestamp,
-          shallowOffsetOfMaxTimestamp = offset,
-          messageSizeMaybeChanged = false)
-      }
+      validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, 
sourceCodec, targetCodec, compactedTopic,
+        messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
     }
   }
 
@@ -159,11 +91,10 @@ private[kafka] object LogValidator {
       builder.convertAndAppendWithOffset(offsetCounter.getAndIncrement(), 
record)
     }
 
-    builder.close()
+    val convertedRecords = builder.build()
     val info = builder.info
-
     ValidationAndOffsetAssignResult(
-      validatedRecords = builder.build(),
+      validatedRecords = convertedRecords,
       maxTimestamp = info.maxTimestamp,
       shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
       messageSizeMaybeChanged = true)
@@ -210,6 +141,87 @@ private[kafka] object LogValidator {
       messageSizeMaybeChanged = false)
   }
 
+  /**
+   * We cannot do in place assignment in one of the following situations:
+   * 1. Source and target compression codec are different
+   * 2. When magic value to use is 0 because offsets need to be overwritten
+   * 3. When magic value to use is above 0, but some fields of inner messages 
need to be overwritten.
+   * 4. Message format conversion is needed.
+   */
+  def validateMessagesAndAssignOffsetsCompressed(records: MemoryRecords,
+                                                 offsetCounter: LongRef,
+                                                 now: Long,
+                                                 sourceCodec: CompressionCodec,
+                                                 targetCodec: CompressionCodec,
+                                                 compactedTopic: Boolean = 
false,
+                                                 messageFormatVersion: Byte = 
Record.CURRENT_MAGIC_VALUE,
+                                                 messageTimestampType: 
TimestampType,
+                                                 messageTimestampDiffMaxMs: 
Long): ValidationAndOffsetAssignResult = {
+    // No in place assignment situation 1 and 2
+    var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion 
> Record.MAGIC_VALUE_V0
+
+    var maxTimestamp = Record.NO_TIMESTAMP
+    val expectedInnerOffset = new LongRef(0)
+    val validatedRecords = new mutable.ArrayBuffer[Record]
+
+    records.deepEntries(true).asScala.foreach { logEntry =>
+      val record = logEntry.record
+      validateKey(record, compactedTopic)
+
+      if (record.magic > Record.MAGIC_VALUE_V0 && messageFormatVersion > 
Record.MAGIC_VALUE_V0) {
+        // Validate the timestamp
+        validateTimestamp(record, now, messageTimestampType, 
messageTimestampDiffMaxMs)
+        // Check if we need to overwrite offset, no in place assignment 
situation 3
+        if (logEntry.offset != expectedInnerOffset.getAndIncrement())
+          inPlaceAssignment = false
+        if (record.timestamp > maxTimestamp)
+          maxTimestamp = record.timestamp
+      }
+
+      if (sourceCodec != NoCompressionCodec && logEntry.isCompressed)
+        throw new InvalidMessageException("Compressed outer record should not 
have an inner record with a " +
+          s"compression attribute set: $record")
+
+      // No in place assignment situation 4
+      if (record.magic != messageFormatVersion)
+        inPlaceAssignment = false
+
+      validatedRecords += record.convert(messageFormatVersion)
+    }
+
+    if (!inPlaceAssignment) {
+      val entries = validatedRecords.map(record => 
LogEntry.create(offsetCounter.getAndIncrement(), record))
+      val builder = MemoryRecords.builderWithEntries(messageTimestampType, 
CompressionType.forId(targetCodec.codec),
+        now, entries.asJava)
+      val updatedRecords = builder.build()
+      val info = builder.info
+      ValidationAndOffsetAssignResult(
+        validatedRecords = updatedRecords,
+        maxTimestamp = info.maxTimestamp,
+        shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp,
+        messageSizeMaybeChanged = true)
+    } else {
+      // ensure the inner messages are valid
+      validatedRecords.foreach(_.ensureValid)
+
+      // we can update the wrapper message only and write the compressed 
payload as is
+      val entry = records.shallowEntries.iterator.next()
+      val offset = offsetCounter.addAndGet(validatedRecords.size) - 1
+      entry.setOffset(offset)
+
+      val shallowTimestamp = if (messageTimestampType == 
TimestampType.LOG_APPEND_TIME) now else maxTimestamp
+      if (messageTimestampType == TimestampType.LOG_APPEND_TIME)
+        entry.setLogAppendTime(shallowTimestamp)
+      else if (messageTimestampType == TimestampType.CREATE_TIME)
+        entry.setCreateTime(shallowTimestamp)
+
+      ValidationAndOffsetAssignResult(validatedRecords = records,
+        maxTimestamp = shallowTimestamp,
+        shallowOffsetOfMaxTimestamp = offset,
+        messageSizeMaybeChanged = false)
+    }
+  }
+
   private def validateKey(record: Record, compactedTopic: Boolean) {
     if (compactedTopic && !record.hasKey)
       throw new InvalidMessageException("Compacted topic cannot accept message 
without key.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala 
b/core/src/main/scala/kafka/message/Message.scala
index 175b7e9..e0efb3d 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -99,8 +99,8 @@ object Message {
 
 
   def fromRecord(record: Record): Message = {
-    val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp() 
== null) None else Some(record.wrapperRecordTimestamp())
-    val wrapperTimestampType = Option(record.wrapperRecordTimestampType())
+    val wrapperTimestamp: Option[Long] = if (record.wrapperRecordTimestamp == 
null) None else Some(record.wrapperRecordTimestamp)
+    val wrapperTimestampType = Option(record.wrapperRecordTimestampType)
     new Message(record.buffer, wrapperTimestamp, wrapperTimestampType)
   }
 }
@@ -139,13 +139,9 @@ class Message(val buffer: ByteBuffer,
   
   import kafka.message.Message._
 
-  private[message] def asRecord: Record = {
-    wrapperMessageTimestamp match {
-      case None => new Record(buffer)
-      case Some(timestamp) =>
-        val timestampType = wrapperMessageTimestampType.orNull
-        new Record(buffer, timestamp, timestampType)
-    }
+  private[message] def asRecord: Record = wrapperMessageTimestamp match {
+    case None => new Record(buffer)
+    case Some(timestamp) => new Record(buffer, timestamp, 
wrapperMessageTimestampType.orNull)
   }
 
   /**
@@ -227,7 +223,7 @@ class Message(val buffer: ByteBuffer,
    * Compute the checksum of the message from the message contents
    */
   def computeChecksum: Long =
-    CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset,  
buffer.limit - MagicOffset)
+    Utils.computeChecksum(buffer, MagicOffset, buffer.limit - MagicOffset)
   
   /**
    * Retrieve the previously computed CRC for this message

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1dbd373..ec25700 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -145,7 +145,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.NONE =>
                   try {
                     val records = partitionData.toRecords
-                    val newOffset = 
records.shallowEntries.asScala.toSeq.lastOption.map(_.nextOffset).getOrElse(
+                    val newOffset = 
records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse(
                       currentPartitionFetchState.offset)
 
                     fetcherLagStats.getAndMaybePut(topic, partitionId).lag = 
Math.max(0L, partitionData.highWatermark - newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala 
b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index 0e05cce..1933339 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -36,8 +36,6 @@ case class TopicPartitionOperationKey(topic: String, 
partition: Int) extends Del
 
   def this(topicPartition: TopicPartition) = this(topicPartition.topic, 
topicPartition.partition)
 
-  def this(topicAndPartition: TopicAndPartition) = 
this(topicAndPartition.topic, topicAndPartition.partition)
-
   override def keyLabel = "%s-%d".format(topic, partition)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index bbddfae..d78021f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -427,7 +427,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
               val controlledShutdownRequest = new 
ControlledShutdownRequest(config.brokerId)
               val request = new ClientRequest(node(prevController).idString, 
time.milliseconds(), true,
                 requestHeader, controlledShutdownRequest, null)
-              val clientResponse = 
networkClient.blockingSendAndReceive(request, controlledShutdownRequest)(time)
+              val clientResponse = 
networkClient.blockingSendAndReceive(request)(time)
 
               val shutdownResponse = 
clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
               if (shutdownResponse.errorCode == Errors.NONE.code && 
shutdownResponse.partitionsRemaining.isEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala 
b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
index 7067b20..05e9842 100644
--- a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -25,7 +25,7 @@ object LogOffsetMetadata {
   val UnknownFilePosition = -1
 
   class OffsetOrdering extends Ordering[LogOffsetMetadata] {
-    override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = {
+    override def compare(x: LogOffsetMetadata, y: LogOffsetMetadata): Int = {
       x.offsetDiff(y).toInt
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index d5d7a13..3811be3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -246,7 +246,7 @@ class ReplicaFetcherThread(name: String,
         throw new SocketTimeoutException(s"Failed to connect within 
$socketTimeout ms")
       else {
         val clientRequest = new ClientRequest(sourceBroker.id.toString, 
time.milliseconds(), true, header, request, null)
-        networkClient.blockingSendAndReceive(clientRequest, request)(time)
+        networkClient.blockingSendAndReceive(clientRequest)(time)
       }
     }
     catch {
@@ -260,7 +260,7 @@ class ReplicaFetcherThread(name: String,
   private def earliestOrLatestOffset(topicPartition: TopicPartition, 
earliestOrLatest: Long, consumerId: Int): Long = {
     val (request, apiVersion) =
       if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
-        val partitions = Map(topicPartition -> 
java.lang.Long.valueOf(earliestOrLatest))
+        val partitions = Map(topicPartition -> (earliestOrLatest: 
java.lang.Long))
         (new ListOffsetRequest(partitions.asJava, consumerId), 1)
       } else {
         val partitions = Map(topicPartition -> new 
ListOffsetRequest.PartitionData(earliestOrLatest, 1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 859a7c4..87b8d90 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -26,7 +26,6 @@ import kafka.cluster.{Partition, Replica}
 import kafka.common._
 import kafka.controller.KafkaController
 import kafka.log.{LogAppendInfo, LogManager}
-import kafka.message.InvalidMessageException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
@@ -217,7 +216,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   def startup() {
     // start ISR expiration thread
-    // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 
(1 + 50%) before it is removed from ISR
+    // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 
1.5 before it is removed from ISR
     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = 
config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
     scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, 
period = 2500L, unit = TimeUnit.MILLISECONDS)
   }
@@ -418,8 +417,6 @@ class ReplicaManager(val config: KafkaConfig,
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
-                   _: InvalidRecordException |
-                   _: InvalidMessageException |
                    _: InvalidTimestampException) =>
             (topicPartition, 
LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
           case t: Throwable =>
@@ -667,7 +664,7 @@ class ReplicaManager(val config: KafkaConfig,
         val partitionState = new mutable.HashMap[Partition, PartitionState]()
         leaderAndISRRequest.partitionStates.asScala.foreach { case 
(topicPartition, stateInfo) =>
           val partition = getOrCreatePartition(topicPartition)
-          val partitionLeaderEpoch = partition.getLeaderEpoch()
+          val partitionLeaderEpoch = partition.getLeaderEpoch
           // If the leader epoch is valid record the epoch of the controller 
that made the leadership decision.
           // This is useful while updating the isr to maintain the decision 
maker controller's epoch in the zookeeper path
           if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
@@ -831,16 +828,16 @@ class ReplicaManager(val config: KafkaConfig,
               partitionsToMakeFollower += partition
             else
               stateChangeLogger.info(("Broker %d skipped the become-follower 
state change after marking its partition as follower with correlation id %d 
from " +
-                "controller %d epoch %d for partition [%s,%d] since the new 
leader %d is the same as the old leader")
+                "controller %d epoch %d for partition %s since the new leader 
%d is the same as the old leader")
                 .format(localBrokerId, correlationId, controllerId, 
partitionStateInfo.controllerEpoch,
-                partition.topic, partition.partitionId, newLeaderBrokerId))
+                partition.topicPartition, newLeaderBrokerId))
           case None =>
             // The leader broker should always be present in the metadata 
cache.
             // If not, we should record the error message and abort the 
transition process for this partition
             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest 
with correlation id %d from controller" +
-              " %d epoch %d for partition [%s,%d] but cannot become follower 
since the new leader %d is unavailable.")
+              " %d epoch %d for partition %s but cannot become follower since 
the new leader %d is unavailable.")
               .format(localBrokerId, correlationId, controllerId, 
partitionStateInfo.controllerEpoch,
-              partition.topic, partition.partitionId, newLeaderBrokerId))
+              partition.topicPartition, newLeaderBrokerId))
             // Create the local replica even if the leader is unavailable. 
This is required to ensure that we include
             // the partition's high watermark in the checkpoint file (see 
KAFKA-1647)
             partition.getOrCreateReplica()
@@ -858,22 +855,22 @@ class ReplicaManager(val config: KafkaConfig,
         (partition.topicPartition, 
partition.getOrCreateReplica().highWatermark.messageOffset)
       }.toMap)
       partitionsToMakeFollower.foreach { partition =>
-        val topicPartitionOperationKey = new 
TopicPartitionOperationKey(partition.topic, partition.partitionId)
+        val topicPartitionOperationKey = new 
TopicPartitionOperationKey(partition.topicPartition)
         tryCompleteDelayedProduce(topicPartitionOperationKey)
         tryCompleteDelayedFetch(topicPartitionOperationKey)
       }
 
       partitionsToMakeFollower.foreach { partition =>
-        stateChangeLogger.trace(("Broker %d truncated logs and checkpointed 
recovery boundaries for partition [%s,%d] as part of " +
+        stateChangeLogger.trace(("Broker %d truncated logs and checkpointed 
recovery boundaries for partition %s as part of " +
           "become-follower request with correlation id %d from controller %d 
epoch %d").format(localBrokerId,
-          partition.topic, partition.partitionId, correlationId, controllerId, 
epoch))
+          partition.topicPartition, correlationId, controllerId, epoch))
       }
 
       if (isShuttingDown.get()) {
         partitionsToMakeFollower.foreach { partition =>
           stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step 
of the become-follower state change with correlation id %d from " +
-            "controller %d epoch %d for partition [%s,%d] since it is shutting 
down").format(localBrokerId, correlationId,
-            controllerId, epoch, partition.topic, partition.partitionId))
+            "controller %d epoch %d for partition %s since it is shutting 
down").format(localBrokerId, correlationId,
+            controllerId, epoch, partition.topicPartition))
         }
       }
       else {
@@ -886,8 +883,8 @@ class ReplicaManager(val config: KafkaConfig,
 
         partitionsToMakeFollower.foreach { partition =>
           stateChangeLogger.trace(("Broker %d started fetcher to new leader as 
part of become-follower request from controller " +
-            "%d epoch %d with correlation id %d for partition [%s,%d]")
-            .format(localBrokerId, controllerId, epoch, correlationId, 
partition.topic, partition.partitionId))
+            "%d epoch %d with correlation id %d for partition %s")
+            .format(localBrokerId, controllerId, epoch, correlationId, 
partition.topicPartition))
         }
       }
     } catch {
@@ -930,7 +927,7 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   private def getLeaderPartitions() : List[Partition] = {
-    allPartitions.values.filter(_.leaderReplicaIfLocal().isDefined).toList
+    allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
   }
 
   // Flushes the highwatermark value for all partitions to the highwatermark 
file
@@ -944,7 +941,7 @@ class ReplicaManager(val config: KafkaConfig,
       } catch {
         case e: IOException =>
           fatal("Error writing to highwatermark file: ", e)
-          Runtime.getRuntime().halt(1)
+          Runtime.getRuntime.halt(1)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 99b5aae..7fe9cc9 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -31,10 +31,8 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection._
 import scala.collection.mutable
 import kafka.cluster.EndPoint
-import org.apache.kafka.common.utils.Crc32
 import org.apache.kafka.common.utils.Utils
 
-
 /**
  * General helper functions!
  *
@@ -130,26 +128,6 @@ object CoreUtils extends Logging {
   }
 
   /**
-   * Compute the CRC32 of the byte array
-   * @param bytes The array to compute the checksum for
-   * @return The CRC32
-   */
-  def crc32(bytes: Array[Byte]): Long = crc32(bytes, 0, bytes.length)
-
-  /**
-   * Compute the CRC32 of the segment of the byte array given by the specified 
size and offset
-   * @param bytes The bytes to checksum
-   * @param offset the offset at which to begin checksumming
-   * @param size the number of bytes to checksum
-   * @return The CRC32
-   */
-  def crc32(bytes: Array[Byte], offset: Int, size: Int): Long = {
-    val crc = new Crc32()
-    crc.update(bytes, offset, size)
-    crc.getValue()
-  }
-
-  /**
    * Read some bytes into the provided buffer, and return the number of bytes 
read. If the
    * channel has been closed or we get -1 on the read for any reason, throw an 
EOFException
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala 
b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
index e3d389b..62e7d94 100644
--- a/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
+++ b/core/src/main/scala/kafka/utils/NetworkClientBlockingOps.scala
@@ -103,7 +103,7 @@ class NetworkClientBlockingOps(val client: NetworkClient) 
extends AnyVal {
    * This method is useful for implementing blocking behaviour on top of the 
non-blocking `NetworkClient`, use it with
    * care.
    */
-  def blockingSendAndReceive(request: ClientRequest, body: 
AbstractRequest)(implicit time: Time): ClientResponse = {
+  def blockingSendAndReceive(request: ClientRequest)(implicit time: Time): 
ClientResponse = {
     client.send(request, time.milliseconds())
 
     pollContinuously { responses =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/main/scala/kafka/utils/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Pool.scala 
b/core/src/main/scala/kafka/utils/Pool.scala
index 0cf6474..df74f29 100644
--- a/core/src/main/scala/kafka/utils/Pool.scala
+++ b/core/src/main/scala/kafka/utils/Pool.scala
@@ -33,9 +33,9 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends 
Iterable[(K, V)] {
     m.foreach(kv => pool.put(kv._1, kv._2))
   }
   
-  def put(k: K, v: V) = pool.put(k, v)
+  def put(k: K, v: V): V = pool.put(k, v)
   
-  def putIfNotExists(k: K, v: V) = pool.putIfAbsent(k, v)
+  def putIfNotExists(k: K, v: V): V = pool.putIfAbsent(k, v)
 
   /**
    * Gets the value associated with the given key. If there is no associated
@@ -44,27 +44,40 @@ class Pool[K,V](valueFactory: Option[K => V] = None) 
extends Iterable[(K, V)] {
    * as lazy if its side-effects need to be avoided.
    *
    * @param key The key to lookup.
-   * @return The final value associated with the key. This may be different 
from
-   *         the value created by the factory if another thread successfully
-   *         put a value.
+   * @return The final value associated with the key.
    */
-  def getAndMaybePut(key: K) = {
+  def getAndMaybePut(key: K): V = {
     if (valueFactory.isEmpty)
       throw new KafkaException("Empty value factory in pool.")
-    val curr = pool.get(key)
-    if (curr == null) {
+    getAndMaybePut(key, valueFactory.get(key))
+  }
+
+  /**
+    * Gets the value associated with the given key. If there is no associated
+    * value, then create the value using the provided by `createValue` and 
return the
+    * value associated with the key.
+    *
+    * @param key The key to lookup.
+    * @param createValue Factory function.
+    * @return The final value associated with the key.
+    */
+  def getAndMaybePut(key: K, createValue: => V): V = {
+    val current = pool.get(key)
+    if (current == null) {
       createLock synchronized {
-        val curr = pool.get(key)
-        if (curr == null)
-          pool.put(key, valueFactory.get(key))
-        pool.get(key)
+        val current = pool.get(key)
+        if (current == null) {
+          val value = createValue
+          pool.put(key, value)
+          value
+        }
+        else current
       }
     }
-    else
-      curr
+    else current
   }
 
-  def contains(id: K) = pool.containsKey(id)
+  def contains(id: K): Boolean = pool.containsKey(id)
   
   def get(key: K): V = pool.get(key)
   
@@ -78,9 +91,9 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends 
Iterable[(K, V)] {
 
   def clear() { pool.clear() }
   
-  override def size = pool.size
+  override def size: Int = pool.size
   
-  override def iterator = new Iterator[(K,V)]() {
+  override def iterator: Iterator[(K, V)] = new Iterator[(K,V)]() {
     
     private val iter = pool.entrySet.iterator
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index bb93cb4..6381447 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -15,7 +15,6 @@ package integration.kafka.api
 import kafka.common.Topic
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.Log
-import kafka.message.GZIPCompressionCodec
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
@@ -27,11 +26,13 @@ import org.junit.Assert._
 import scala.collection.JavaConverters._
 import java.util.Properties
 
+import org.apache.kafka.common.record.CompressionType
+
 class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
-  val offsetsTopicCompressionCodec = GZIPCompressionCodec
+  val offsetsTopicCompressionCodec = CompressionType.GZIP
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, 
offsetsTopicCompressionCodec.codec.toString)
+  overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, 
offsetsTopicCompressionCodec.id.toString)
 
   override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, 
enableControlledShutdown = false).map {
     KafkaConfig.fromProps(_, overridingProps)
@@ -55,8 +56,8 @@ class GroupCoordinatorIntegrationTest extends 
KafkaServerTestHarness {
 
     val logSegments = getGroupMetadataLogOpt.get.logSegments
     val incorrectCompressionCodecs = logSegments
-      .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType.id))
-      .filter(_ != offsetsTopicCompressionCodec.codec)
+      .flatMap(_.log.shallowEntries.asScala.map(_.record.compressionType))
+      .filter(_ != offsetsTopicCompressionCodec)
     assertEquals("Incorrect compression codecs should be empty", Seq.empty, 
incorrectCompressionCodecs)
 
     consumer.close()

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 83280dc..ee556d7 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -29,8 +29,6 @@ import kafka.integration.KafkaServerTestHarness
 import org.junit.{After, Before}
 
 import scala.collection.mutable.Buffer
-import scala.util.control.Breaks.{breakable, break}
-import java.util.ConcurrentModificationException
 
 /**
  * A helper class for writing integration tests that involve producers, 
consumers, and servers

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 
b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index ef2b0af..b7d2fa1 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -18,7 +18,8 @@
 package kafka.message
 
 import java.nio.ByteBuffer
-import java.nio.channels.GatheringByteChannel
+import java.nio.channels.{FileChannel, GatheringByteChannel}
+import java.nio.file.StandardOpenOption
 
 import org.junit.Assert._
 import kafka.utils.TestUtils._
@@ -118,14 +119,13 @@ trait BaseMessageSetTestCases extends JUnitSuite {
     // do the write twice to ensure the message set is restored to its 
original state
     for (_ <- 0 to 1) {
       val file = tempFile()
-      val fileRecords = FileRecords.open(file, true)
+      val channel = FileChannel.open(file.toPath, StandardOpenOption.READ, 
StandardOpenOption.WRITE)
       try {
-        val written = write(fileRecords.channel)
-        fileRecords.resize() // resize since we wrote to the channel directly
-
+        val written = write(channel)
         assertEquals("Expect to write the number of bytes in the set.", 
set.sizeInBytes, written)
+        val fileRecords = new FileRecords(file, channel, 0, Integer.MAX_VALUE, 
false)
         assertEquals(set.asRecords.deepEntries.asScala.toVector, 
fileRecords.deepEntries.asScala.toVector)
-      } finally fileRecords.close()
+      } finally channel.close()
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 0a03cac..348bfc3 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -75,11 +75,11 @@ class IsrExpirationTest {
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
     // let the follower catch up to the Leader logEndOffset (15)
-    for(replica <- partition0.assignedReplicas() - leaderReplica)
+    for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
                                                     hw = 15L,
                                                     leaderLogEndOffset = 15L,
-                                                    fetchTimeMs 
=time.milliseconds,
+                                                    fetchTimeMs = 
time.milliseconds,
                                                     readSize = -1))
     var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
@@ -127,11 +127,11 @@ class IsrExpirationTest {
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
     // Make the remote replica not read to the end of log. It should be not be 
out of sync for at least 100 ms
-    for(replica <- partition0.assignedReplicas() - leaderReplica)
+    for (replica <- partition0.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(10L), MemoryRecords.EMPTY),
                                                     hw = 10L,
                                                     leaderLogEndOffset = 15L,
-                                                    fetchTimeMs 
=time.milliseconds,
+                                                    fetchTimeMs = 
time.milliseconds,
                                                     readSize = -1))
 
     // Simulate 2 fetch requests spanning more than 100 ms which do not read 
to the end of the log.
@@ -141,12 +141,13 @@ class IsrExpirationTest {
 
     time.sleep(75)
 
-    (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(11L), MemoryRecords.EMPTY),
-                                                   hw = 11L,
-                                                   leaderLogEndOffset = 15L,
-                                                   fetchTimeMs 
=time.milliseconds,
-                                                   readSize = -1)))
+    (partition0.assignedReplicas - leaderReplica).foreach { r =>
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(11L), MemoryRecords.EMPTY),
+                            hw = 11L,
+                            leaderLogEndOffset = 15L,
+                            fetchTimeMs = time.milliseconds,
+                            readSize = -1))
+    }
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
 
@@ -157,12 +158,13 @@ class IsrExpirationTest {
     assertEquals("Replica 1 should be out of sync", 
Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
 
     // Now actually make a fetch to the end of the log. The replicas should be 
back in ISR
-    (partition0.assignedReplicas() - leaderReplica).foreach(
-      r => r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                                                   hw = 15L,
-                                                   leaderLogEndOffset = 15L,
-                                                   fetchTimeMs 
=time.milliseconds,
-                                                   readSize = -1)))
+    (partition0.assignedReplicas - leaderReplica).foreach { r =>
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
+                            hw = 15L,
+                            leaderLogEndOffset = 15L,
+                            fetchTimeMs = time.milliseconds,
+                            readSize = -1))
+    }
     partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
     assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
 
@@ -180,7 +182,7 @@ class IsrExpirationTest {
     // set in sync replicas for this partition to all the assigned replicas
     partition.inSyncReplicas = allReplicas.toSet
     // set lastCaughtUpTime to current time
-    for(replica <- partition.assignedReplicas() - leaderReplica)
+    for (replica <- partition.assignedReplicas - leaderReplica)
       replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(0L), MemoryRecords.EMPTY),
                                                     hw = 0L,
                                                     leaderLogEndOffset = 0L,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1ce17dc..cffb04d 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -32,7 +32,6 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.easymock.EasyMock
 import org.junit.Assert._
-import scala.collection.JavaConverters._
 
 class SimpleFetchTest {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6c77a7/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 2d9b9a5..ac83f1c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -219,7 +219,7 @@ public class InternalTopicIntegrationTest {
         assertTrue(policies.contains(LogConfig.Compact()));
         assertTrue(policies.contains(LogConfig.Delete()));
         // retention should be 1 day + the window duration
-        final Long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) 
+ durationMs;
-        assertEquals(retention, 
Long.valueOf(properties.getProperty(LogConfig.RetentionMsProp())));
+        final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) 
+ durationMs;
+        assertEquals(retention, 
Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
     }
 }

Reply via email to