Repository: kylin
Updated Branches:
  refs/heads/master b7ae177f4 -> 06e1ea296


KYLIN-1696: add catch exception during sending topic meta request to broker


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/06e1ea29
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/06e1ea29
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/06e1ea29

Branch: refs/heads/master
Commit: 06e1ea2960c2d0dffc9245fa52a0e281d72c846c
Parents: b7ae177
Author: kyotoYaho <nju_y...@apache.org>
Authored: Mon May 16 20:15:56 2016 +0800
Committer: kyotoYaho <nju_y...@apache.org>
Committed: Mon May 16 20:15:56 2016 +0800

----------------------------------------------------------------------
 .../kylin/source/kafka/util/KafkaRequester.java | 24 ++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/06e1ea29/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
index b78d30f..01c3946 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java
@@ -101,7 +101,13 @@ public final class KafkaRequester {
             consumer = getSimpleConsumer(broker, 
kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), 
"topic_meta_lookup");
             List<String> topics = 
Collections.singletonList(kafkaClusterConfig.getTopic());
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
+            TopicMetadataResponse resp;
+            try{
+                resp = consumer.send(req);
+            }catch (Exception e){
+                logger.warn("cannot send TopicMetadataRequest successfully: " 
+ e);
+                continue;
+            }
             final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
             if (topicMetadatas.size() != 1) {
                 break;
@@ -124,12 +130,19 @@ public final class KafkaRequester {
     }
 
     public static PartitionMetadata getPartitionMetadata(String topic, int 
partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) {
+        logger.debug("Brokers: " + brokers.toString());
         SimpleConsumer consumer;
         for (Broker broker : brokers) {
             consumer = getSimpleConsumer(broker, 
kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), 
"topic_meta_lookup");
             List<String> topics = Collections.singletonList(topic);
             TopicMetadataRequest req = new TopicMetadataRequest(topics);
-            TopicMetadataResponse resp = consumer.send(req);
+            TopicMetadataResponse resp;
+            try{
+                resp = consumer.send(req);
+            }catch (Exception e){
+                logger.warn("cannot send TopicMetadataRequest successfully: " 
+ e);
+                continue;
+            }
             final List<TopicMetadata> topicMetadatas = resp.topicsMetadata();
             if (topicMetadatas.size() != 1) {
                 logger.warn("invalid topicMetadata size:" + 
topicMetadatas.size());
@@ -141,6 +154,13 @@ public final class KafkaRequester {
                 break;
             }
             for (PartitionMetadata partitionMetadata : 
topicMetadata.partitionsMetadata()) {
+                StringBuffer logText = new StringBuffer();
+                logText.append("PartitionMetadata debug errorCode: " + 
partitionMetadata.errorCode());
+                logText.append("PartitionMetadata debug partitionId: " + 
partitionMetadata.partitionId());
+                logText.append("PartitionMetadata debug leader: " + 
partitionMetadata.leader());
+                logText.append("PartitionMetadata debug ISR: " + 
partitionMetadata.isr());
+                logText.append("PartitionMetadata debug replica: " + 
partitionMetadata.replicas());
+                logger.info(logText.toString());
                 if (partitionMetadata.partitionId() == partitionId) {
                     return partitionMetadata;
                 }

Reply via email to