Updated Branches:
  refs/heads/0.8 3817857b1 -> d3aa3ef07

kafka-879; In system test, read the new leader from zookeeper instead of broker 
log on completion of become-leader state transition; patched by John Fung; 
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d3aa3ef0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d3aa3ef0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d3aa3ef0

Branch: refs/heads/0.8
Commit: d3aa3ef073fe773a10168f925a45747f52c4e3c0
Parents: 3817857
Author: John Fung <[email protected]>
Authored: Tue Jul 23 09:38:52 2013 -0700
Committer: Jun Rao <[email protected]>
Committed: Tue Jul 23 09:38:52 2013 -0700

----------------------------------------------------------------------
 .../replication_testsuite/replica_basic_test.py | 36 ++++++-------
 system_test/utils/kafka_system_test_utils.py    | 57 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d3aa3ef0/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py 
b/system_test/replication_testsuite/replica_basic_test.py
index 40c1157..17414ad 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -231,7 +231,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                     # ==============================================
                     if brokerType == "leader" or brokerType == "follower":
                         self.log_message("looking up leader")
-                        leaderDict = 
kafka_system_test_utils.get_leader_elected_log_line(self.systemTestEnv, 
self.testcaseEnv, self.leaderAttributesDict)
+                        leaderDict = 
kafka_system_test_utils.get_leader_attributes(self.systemTestEnv, 
self.testcaseEnv)
 
                         # ==========================
                         # leaderDict looks like this:
@@ -285,10 +285,10 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                             
kafka_system_test_utils.validate_leader_election_successful(self.testcaseEnv, 
leaderDict, self.testcaseEnv.validationStatusDict)
                 
                             # trigger leader re-election by stopping leader to 
get re-election latency
-                            reelectionLatency = 
kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, 
self.testcaseEnv, leaderDict, self.leaderAttributesDict)
-                            latencyKeyName = "Leader Election Latency - iter " 
+ str(i) + " brokerid " + leaderDict["brokerid"]
-                            
self.testcaseEnv.validationStatusDict[latencyKeyName] = 
str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
-                            
self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency
 * 1000))
+                            #reelectionLatency = 
kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, 
self.testcaseEnv, leaderDict, self.leaderAttributesDict)
+                            #latencyKeyName = "Leader Election Latency - iter 
" + str(i) + " brokerid " + leaderDict["brokerid"]
+                            
#self.testcaseEnv.validationStatusDict[latencyKeyName] = 
str("{0:.2f}".format(reelectionLatency * 1000)) + " ms"
+                            
#self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"].append("{0:.2f}".format(reelectionLatency
 * 1000))
 
                         elif brokerType == "follower":
                             # stopping Follower
@@ -330,19 +330,19 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                 # while loop
 
                 # update Leader Election Latency MIN/MAX to 
testcaseEnv.validationStatusDict
-                self.testcaseEnv.validationStatusDict["Leader Election Latency 
MIN"] = None
-                try:
-                    self.testcaseEnv.validationStatusDict["Leader Election 
Latency MIN"] = \
-                        
min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
-                except:
-                    pass
-
-                self.testcaseEnv.validationStatusDict["Leader Election Latency 
MAX"] = None
-                try:
-                    self.testcaseEnv.validationStatusDict["Leader Election 
Latency MAX"] = \
-                        
max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
-                except:
-                    pass
+                #self.testcaseEnv.validationStatusDict["Leader Election 
Latency MIN"] = None
+                #try:
+                #    self.testcaseEnv.validationStatusDict["Leader Election 
Latency MIN"] = \
+                #        
min(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+                #except:
+                #    pass
+                #
+                #self.testcaseEnv.validationStatusDict["Leader Election 
Latency MAX"] = None
+                #try:
+                #    self.testcaseEnv.validationStatusDict["Leader Election 
Latency MAX"] = \
+                #        
max(self.testcaseEnv.userDefinedEnvVarDict["leaderElectionLatencyList"])
+                #except:
+                #    pass
 
                 # =============================================
                 # tell producer to stop

http://git-wip-us.apache.org/repos/asf/kafka/blob/d3aa3ef0/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py 
b/system_test/utils/kafka_system_test_utils.py
index ae393bc..de16a34 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -2211,3 +2211,60 @@ def validate_index_log(systemTestEnv, testcaseEnv, 
clusterName="source"):
     else:
         validationStatusDict["Validate index log in cluster [" + clusterName + 
"]"] = "FAILED"
 
+def get_leader_attributes(systemTestEnv, testcaseEnv):
+
+    logger.info("Querying Zookeeper for leader info ...", extra=d)
+
+    # keep track of leader data in this dict such as broker id & entity id
+    leaderDict = {} 
+
+    clusterConfigsList = systemTestEnv.clusterEntityConfigDictList
+    tcConfigsList      = testcaseEnv.testcaseConfigsList
+
+    zkDictList         = 
system_test_utils.get_dict_from_list_of_dicts(clusterConfigsList, "role", 
"zookeeper")
+    firstZkDict        = zkDictList[0]
+    hostname           = firstZkDict["hostname"]
+    zkEntityId         = firstZkDict["entity_id"]
+    clientPort         = 
system_test_utils.get_data_by_lookup_keyval(tcConfigsList, "entity_id", 
zkEntityId, "clientPort")
+    kafkaHome          = 
system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", 
zkEntityId, "kafka_home")
+    javaHome           = 
system_test_utils.get_data_by_lookup_keyval(clusterConfigsList, "entity_id", 
zkEntityId, "java_home")
+    kafkaRunClassBin   = kafkaHome + "/bin/kafka-run-class.sh"
+
+    # this should have been updated in start_producer_in_thread
+    producerTopicsString = testcaseEnv.producerTopicsString
+    topics = producerTopicsString.split(',')
+    zkQueryStr = "get /brokers/topics/" + topics[0] + "/partitions/0/state"
+    brokerid   = ''
+
+    cmdStrList = ["ssh " + hostname,
+                  "\"JAVA_HOME=" + javaHome,
+                  kafkaRunClassBin + " org.apache.zookeeper.ZooKeeperMain",
+                  "-server " + 
testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"],
+                  zkQueryStr + " 2> /dev/null | tail -1\""]
+    cmdStr = " ".join(cmdStrList)
+    logger.debug("executing command [" + cmdStr + "]", extra=d)
+
+    subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+    for line in subproc.stdout.readlines():
+        logger.debug("zk returned : " + line, extra=d)
+        if "\"leader\"" in line:
+            line = line.rstrip('\n')
+            json_data = json.loads(line)
+            for key,val in json_data.items():
+                if key == 'leader':
+                    brokerid = str(val)
+
+            leaderDict["brokerid"]  = brokerid
+            leaderDict["topic"]     = topics[0]
+            leaderDict["partition"] = '0'
+            leaderDict["entity_id"] = 
system_test_utils.get_data_by_lookup_keyval(
+                                          tcConfigsList, "broker.id", 
brokerid, "entity_id")
+            leaderDict["hostname"]  = 
system_test_utils.get_data_by_lookup_keyval(
+                                          clusterConfigsList, "entity_id", 
leaderDict["entity_id"], "hostname")
+            break
+
+    print leaderDict
+    return leaderDict
+
+
+

Reply via email to