Repository: kafka
Updated Branches:
  refs/heads/trunk c62dc28ce -> 2959bc2ad


MINOR: Replaced unnecessary isDefined and get on option values with fold

Author: himani1 <1himani.ar...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2050 from himani1/refactored_code


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2959bc2a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2959bc2a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2959bc2a

Branch: refs/heads/trunk
Commit: 2959bc2ad382fa786bd6209fe742b8218edfa4a8
Parents: c62dc28
Author: himani1 <1himani.ar...@gmail.com>
Authored: Tue Nov 1 22:20:19 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Nov 1 22:20:19 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/api/TopicMetadata.scala    |  6 ++--
 .../kafka/network/RequestOrResponseSend.scala   |  2 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 38 +++++++++++---------
 3 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2959bc2a/core/src/main/scala/kafka/api/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala 
b/core/src/main/scala/kafka/api/TopicMetadata.scala
index 815de21..1be5fbc 100644
--- a/core/src/main/scala/kafka/api/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadata.scala
@@ -126,7 +126,7 @@ case class PartitionMetadata(partitionId: Int,
     buffer.putInt(partitionId)
 
     /* leader */
-    val leaderId = if(leader.isDefined) leader.get.id else 
TopicMetadata.NoLeaderNodeId
+    val leaderId = leader.fold(TopicMetadata.NoLeaderNodeId)(leader => 
leader.id)
     buffer.putInt(leaderId)
 
     /* number of replicas */
@@ -141,10 +141,10 @@ case class PartitionMetadata(partitionId: Int,
   override def toString: String = {
     val partitionMetadataString = new StringBuilder
     partitionMetadataString.append("\tpartition " + partitionId)
-    partitionMetadataString.append("\tleader: " + (if(leader.isDefined) 
leader.get.toString else "none"))
+    partitionMetadataString.append("\tleader: " + leader.getOrElse("none"))
     partitionMetadataString.append("\treplicas: " + replicas.mkString(","))
     partitionMetadataString.append("\tisr: " + isr.mkString(","))
-    partitionMetadataString.append("\tisUnderReplicated: 
%s".format(if(isr.size < replicas.size) "true" else "false"))
+    partitionMetadataString.append("\tisUnderReplicated: %s" + (isr.size < 
replicas.size))
     partitionMetadataString.toString()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2959bc2a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala 
b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
index 153d636..1bfbf53 100644
--- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
+++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.network.NetworkSend
 
 object RequestOrResponseSend {
   def serialize(request: RequestOrResponse): ByteBuffer = {
-    val buffer = ByteBuffer.allocate(request.sizeInBytes + 
(if(request.requestId.isDefined) 2 else 0))
+    val buffer = ByteBuffer.allocate(request.sizeInBytes + 
request.requestId.fold(0)(_ => 2))
     request.requestId match {
       case Some(requestId) =>
         buffer.putShort(requestId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2959bc2a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index c299676..221ef6c 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -187,23 +187,27 @@ object DumpLogSegments {
       var maxTimestamp = Message.NoTimestamp
       // We first find the message by offset then check if the timestamp is 
correct.
       val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + 
timeIndex.baseOffset)
-      if (!wrapperMessageOpt.isDefined || wrapperMessageOpt.get.offset != 
entry.offset + timeIndex.baseOffset) {
-        timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + 
timeIndex.baseOffset,
-          {if (wrapperMessageOpt.isDefined) wrapperMessageOpt.get.offset else 
-1})
-      } else {
-        val deepIter = getIterator(wrapperMessageOpt.get, isDeepIteration = 
true)
-        for (messageAndOffset <- deepIter)
-          maxTimestamp = math.max(maxTimestamp, 
messageAndOffset.message.timestamp)
-
-        if (maxTimestamp != entry.timestamp)
-          timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, 
maxTimestamp)
-
-        if (prevTimestamp >= entry.timestamp)
-          timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, 
entry.timestamp, prevTimestamp)
-
-        // since it is a sparse file, in the event of a crash there may be 
many zero entries, stop if we see one
-        if (entry.offset == 0 && i > 0)
-          return
+      wrapperMessageOpt match {
+        case None =>
+          timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + 
timeIndex.baseOffset,
+            -1.toLong)
+        case Some(wrapperMessage) if wrapperMessage.offset != entry.offset + 
timeIndex.baseOffset =>
+          timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + 
timeIndex.baseOffset,
+            wrapperMessage.offset)
+        case Some(wrapperMessage) =>
+          val deepIter = getIterator(wrapperMessage, isDeepIteration = true)
+          for (messageAndOffset <- deepIter)
+            maxTimestamp = math.max(maxTimestamp, 
messageAndOffset.message.timestamp)
+
+          if (maxTimestamp != entry.timestamp)
+            timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, 
maxTimestamp)
+
+          if (prevTimestamp >= entry.timestamp)
+            timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, 
entry.timestamp, prevTimestamp)
+
+          // since it is a sparse file, in the event of a crash there may be 
many zero entries, stop if we see one
+          if (entry.offset == 0 && i > 0)
+            return
       }
       if (!verifyOnly)
         println("timestamp: %s offset: %s".format(entry.timestamp, 
timeIndex.baseOffset + entry.offset))

Reply via email to