Repository: kafka Updated Branches: refs/heads/trunk c62dc28ce -> 2959bc2ad
MINOR: Replaced unnecessary isDefined and get on option values with fold Author: himani1 <1himani.ar...@gmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #2050 from himani1/refactored_code Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2959bc2a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2959bc2a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2959bc2a Branch: refs/heads/trunk Commit: 2959bc2ad382fa786bd6209fe742b8218edfa4a8 Parents: c62dc28 Author: himani1 <1himani.ar...@gmail.com> Authored: Tue Nov 1 22:20:19 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Nov 1 22:20:19 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/api/TopicMetadata.scala | 6 ++-- .../kafka/network/RequestOrResponseSend.scala | 2 +- .../scala/kafka/tools/DumpLogSegments.scala | 38 +++++++++++--------- 3 files changed, 25 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2959bc2a/core/src/main/scala/kafka/api/TopicMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/TopicMetadata.scala b/core/src/main/scala/kafka/api/TopicMetadata.scala index 815de21..1be5fbc 100644 --- a/core/src/main/scala/kafka/api/TopicMetadata.scala +++ b/core/src/main/scala/kafka/api/TopicMetadata.scala @@ -126,7 +126,7 @@ case class PartitionMetadata(partitionId: Int, buffer.putInt(partitionId) /* leader */ - val leaderId = if(leader.isDefined) leader.get.id else TopicMetadata.NoLeaderNodeId + val leaderId = leader.fold(TopicMetadata.NoLeaderNodeId)(leader => leader.id) buffer.putInt(leaderId) /* number of replicas */ @@ -141,10 +141,10 @@ case class PartitionMetadata(partitionId: Int, override def toString: String = { val partitionMetadataString = new StringBuilder partitionMetadataString.append("\tpartition " + partitionId) - partitionMetadataString.append("\tleader: " + (if(leader.isDefined) leader.get.toString else "none")) + partitionMetadataString.append("\tleader: " + leader.getOrElse("none")) partitionMetadataString.append("\treplicas: " + replicas.mkString(",")) partitionMetadataString.append("\tisr: " + isr.mkString(",")) - partitionMetadataString.append("\tisUnderReplicated: %s".format(if(isr.size < replicas.size) "true" else "false")) + partitionMetadataString.append("\tisUnderReplicated: %s" + (isr.size < replicas.size)) partitionMetadataString.toString() } http://git-wip-us.apache.org/repos/asf/kafka/blob/2959bc2a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala index 153d636..1bfbf53 100644 --- a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala +++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.network.NetworkSend object RequestOrResponseSend { def serialize(request: RequestOrResponse): ByteBuffer = { - val buffer = ByteBuffer.allocate(request.sizeInBytes + (if(request.requestId.isDefined) 2 else 0)) + val buffer = ByteBuffer.allocate(request.sizeInBytes + request.requestId.fold(0)(_ => 2)) request.requestId match { case Some(requestId) => buffer.putShort(requestId) http://git-wip-us.apache.org/repos/asf/kafka/blob/2959bc2a/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index c299676..221ef6c 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -187,23 +187,27 @@ object DumpLogSegments { var maxTimestamp = Message.NoTimestamp // We first find the message by offset then check if the timestamp is correct. val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + timeIndex.baseOffset) - if (!wrapperMessageOpt.isDefined || wrapperMessageOpt.get.offset != entry.offset + timeIndex.baseOffset) { - timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, - {if (wrapperMessageOpt.isDefined) wrapperMessageOpt.get.offset else -1}) - } else { - val deepIter = getIterator(wrapperMessageOpt.get, isDeepIteration = true) - for (messageAndOffset <- deepIter) - maxTimestamp = math.max(maxTimestamp, messageAndOffset.message.timestamp) - - if (maxTimestamp != entry.timestamp) - timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp) - - if (prevTimestamp >= entry.timestamp) - timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, entry.timestamp, prevTimestamp) - - // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one - if (entry.offset == 0 && i > 0) - return + wrapperMessageOpt match { + case None => + timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, + -1.toLong) + case Some(wrapperMessage) if wrapperMessage.offset != entry.offset + timeIndex.baseOffset => + timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset, + wrapperMessage.offset) + case Some(wrapperMessage) => + val deepIter = getIterator(wrapperMessage, isDeepIteration = true) + for (messageAndOffset <- deepIter) + maxTimestamp = math.max(maxTimestamp, messageAndOffset.message.timestamp) + + if (maxTimestamp != entry.timestamp) + timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp) + + if (prevTimestamp >= entry.timestamp) + timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, entry.timestamp, prevTimestamp) + + // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one + if (entry.offset == 0 && i > 0) + return } if (!verifyOnly) println("timestamp: %s offset: %s".format(entry.timestamp, timeIndex.baseOffset + entry.offset))