[
https://issues.apache.org/jira/browse/KAFKA-1818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eric Olander updated KAFKA-1818:
--------------------------------
Status: Patch Available (was: Open)
>From bb32b9bfbcc5e418cbb0b6b4e9f6aa39d5ce1345 Mon Sep 17 00:00:00 2001
From: Eric Olander <[email protected]>
Date: Sun, 14 Dec 2014 12:12:20 -0700
Subject: [PATCH] KAFKA-1818 clean up code to more idiomatic scala usage
---
.../main/scala/kafka/utils/ReplicationUtils.scala | 32 ++++++++--------------
.../unit/kafka/utils/ReplicationUtilsTest.scala | 10 +++++++
2 files changed, 22 insertions(+), 20 deletions(-)
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 7157673..4b642ea 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -61,30 +61,22 @@ object ReplicationUtils extends Logging {
def getLeaderIsrAndEpochForPartition(zkClient: ZkClient, topic: String,
partition: Int):Option[LeaderIsrAndControllerEpoch] = {
val leaderAndIsrPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic,
partition)
- val leaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient,
leaderAndIsrPath)
- val leaderAndIsrOpt = leaderAndIsrInfo._1
- val stat = leaderAndIsrInfo._2
- leaderAndIsrOpt match {
- case Some(leaderAndIsrStr) => parseLeaderAndIsr(leaderAndIsrStr,
leaderAndIsrPath, stat)
- case None => None
- }
+ val (leaderAndIsrOpt, stat) = ZkUtils.readDataMaybeNull(zkClient,
leaderAndIsrPath)
+ leaderAndIsrOpt.flatMap(leaderAndIsrStr =>
parseLeaderAndIsr(leaderAndIsrStr, leaderAndIsrPath, stat))
}
private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat:
Stat)
: Option[LeaderIsrAndControllerEpoch] = {
- Json.parseFull(leaderAndIsrStr) match {
- case Some(m) =>
- val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
- val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
- val epoch =
leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
- val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
- val controllerEpoch =
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
- val zkPathVersion = stat.getVersion
- debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for
leaderAndIsrPath %s".format(leader, epoch,
- isr.toString(), zkPathVersion, path))
- Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr,
zkPathVersion), controllerEpoch))
- case None => None
- }
+ Json.parseFull(leaderAndIsrStr).flatMap{m =>
+ val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, Any]]
+ val leader = leaderIsrAndEpochInfo.get("leader").get.asInstanceOf[Int]
+ val epoch =
leaderIsrAndEpochInfo.get("leader_epoch").get.asInstanceOf[Int]
+ val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
+ val controllerEpoch =
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
+ val zkPathVersion = stat.getVersion
+ debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for
leaderAndIsrPath %s".format(leader, epoch,
+ isr.toString(), zkPathVersion, path))
+ Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr,
zkPathVersion), controllerEpoch))}
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 84e0855..305498a 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,6 +17,7 @@
package kafka.utils
+import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.server.{ReplicaFetcherManager, KafkaConfig}
import kafka.api.LeaderAndIsr
import kafka.zk.ZooKeeperTestHarness
@@ -42,6 +43,8 @@ class ReplicationUtilsTest extends JUnit3Suite with
ZooKeeperTestHarness {
val topicDataMismatch = Json.encode(Map("controller_epoch" -> 1, "leader" ->
1,
"versions" -> 2, "leader_epoch" -> 2,"isr" -> List(1,2)))
+ val topicDataLeaderIsrAndControllerEpoch =
LeaderIsrAndControllerEpoch(LeaderAndIsr(1,leaderEpoch,List(1,2),0),
controllerEpoch)
+
override def setUp() {
super.setUp()
@@ -92,4 +95,11 @@ class ReplicationUtilsTest extends JUnit3Suite with
ZooKeeperTestHarness {
assertEquals(newZkVersion3,-1)
}
+ @Test
+ def testGetLeaderIsrAndEpochForPartition() {
+ val leaderIsrAndControllerEpoch =
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId)
+ assertEquals(topicDataLeaderIsrAndControllerEpoch,
leaderIsrAndControllerEpoch.get)
+ assertEquals(None,
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partitionId
+ 1))
+ }
+
}
--
1.8.0
> Code cleanup in ReplicationUtils including unit test
> ----------------------------------------------------
>
> Key: KAFKA-1818
> URL: https://issues.apache.org/jira/browse/KAFKA-1818
> Project: Kafka
> Issue Type: Improvement
> Components: replication
> Affects Versions: 0.8.1.1
> Reporter: Eric Olander
> Assignee: Neha Narkhede
> Priority: Trivial
>
> Code in getLeaderIsrAndEpochForPartition() and parseLeaderAndIsr() was
> essentially reimplementing the flatMap function on the Option type. The
> attached patch refactors that code to more idiomatic Scala and provides a
> unit test over the affected code.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)