Repository: kylin Updated Branches: refs/heads/master b7ae177f4 -> 06e1ea296
KYLIN-1696: add catch exception during sending topic meta request to broker Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/06e1ea29 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/06e1ea29 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/06e1ea29 Branch: refs/heads/master Commit: 06e1ea2960c2d0dffc9245fa52a0e281d72c846c Parents: b7ae177 Author: kyotoYaho <nju_y...@apache.org> Authored: Mon May 16 20:15:56 2016 +0800 Committer: kyotoYaho <nju_y...@apache.org> Committed: Mon May 16 20:15:56 2016 +0800 ---------------------------------------------------------------------- .../kylin/source/kafka/util/KafkaRequester.java | 24 ++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/06e1ea29/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java index b78d30f..01c3946 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaRequester.java @@ -101,7 +101,13 @@ public final class KafkaRequester { consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup"); List<String> topics = Collections.singletonList(kafkaClusterConfig.getTopic()); TopicMetadataRequest req = new TopicMetadataRequest(topics); - TopicMetadataResponse resp = consumer.send(req); + TopicMetadataResponse resp; + try{ + resp = consumer.send(req); + }catch (Exception e){ + logger.warn("cannot send TopicMetadataRequest successfully: " + e); + continue; + } final List<TopicMetadata> topicMetadatas = resp.topicsMetadata(); if (topicMetadatas.size() != 1) { break; @@ -124,12 +130,19 @@ public final class KafkaRequester { } public static PartitionMetadata getPartitionMetadata(String topic, int partitionId, List<Broker> brokers, KafkaClusterConfig kafkaClusterConfig) { + logger.debug("Brokers: " + brokers.toString()); SimpleConsumer consumer; for (Broker broker : brokers) { consumer = getSimpleConsumer(broker, kafkaClusterConfig.getTimeout(), kafkaClusterConfig.getBufferSize(), "topic_meta_lookup"); List<String> topics = Collections.singletonList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); - TopicMetadataResponse resp = consumer.send(req); + TopicMetadataResponse resp; + try{ + resp = consumer.send(req); + }catch (Exception e){ + logger.warn("cannot send TopicMetadataRequest successfully: " + e); + continue; + } final List<TopicMetadata> topicMetadatas = resp.topicsMetadata(); if (topicMetadatas.size() != 1) { logger.warn("invalid topicMetadata size:" + topicMetadatas.size()); @@ -141,6 +154,13 @@ public final class KafkaRequester { break; } for (PartitionMetadata partitionMetadata : topicMetadata.partitionsMetadata()) { + StringBuffer logText = new StringBuffer(); + logText.append("PartitionMetadata debug errorCode: " + partitionMetadata.errorCode()); + logText.append("PartitionMetadata debug partitionId: " + partitionMetadata.partitionId()); + logText.append("PartitionMetadata debug leader: " + partitionMetadata.leader()); + logText.append("PartitionMetadata debug ISR: " + partitionMetadata.isr()); + logText.append("PartitionMetadata debug replica: " + partitionMetadata.replicas()); + logger.info(logText.toString()); if (partitionMetadata.partitionId() == partitionId) { return partitionMetadata; }