Repository: samza
Updated Branches:
  refs/heads/master 40ffe4ea5 -> e8e67ab15


Fix SAMZA-1018.

Check error code from metadata fetch in getSystemStreamPartitionCounts to avoid 
returning no data for newly created topics.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e8e67ab1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e8e67ab1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e8e67ab1

Branch: refs/heads/master
Commit: e8e67ab156967eaaad1aed304fd26ef81efea127
Parents: 40ffe4e
Author: Tommy Becker <[email protected]>
Authored: Mon Sep 19 08:21:12 2016 -0400
Committer: Jacob Maes <[email protected]>
Committed: Fri Sep 23 14:02:50 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/KafkaSystemAdmin.scala    | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e8e67ab1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index ba8de5c..5927cca 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -51,7 +51,7 @@ object KafkaSystemAdmin extends Logging {
           val streamPartitionMetadata = systemStreamPartitions
             .map(systemStreamPartition => {
               val partitionMetadata = new SystemStreamPartitionMetadata(
-                // If the topic/partition is empty then oldest and newest will 
+                // If the topic/partition is empty then oldest and newest will
                 // be stripped of their offsets, so default to null.
                 oldestOffsets.getOrElse(systemStreamPartition, null),
                 newestOffsets.getOrElse(systemStreamPartition, null),
@@ -157,6 +157,7 @@ class KafkaSystemAdmin(
           metadataTTL)
         val result = metadata.map {
           case (topic, topicMetadata) => {
+            KafkaUtil.maybeThrowException(topicMetadata.errorCode)
             val partitionsMap = topicMetadata.partitionsMetadata.map {
               pm =>
                 new Partition(pm.partitionId) -> new 
SystemStreamPartitionMetadata("", "", "")
@@ -183,8 +184,8 @@ class KafkaSystemAdmin(
    * SystemStreamPartition that was passed in.
    */
   override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, 
String]) = {
-    // This is safe to do with Kafka, even if a topic is key-deduped. If the 
-    // offset doesn't exist on a compacted topic, Kafka will return the first 
+    // This is safe to do with Kafka, even if a topic is key-deduped. If the
+    // offset doesn't exist on a compacted topic, Kafka will return the first
     // message AFTER the offset that was specified in the fetch request.
     offsets.mapValues(offset => (offset.toLong + 1).toString)
   }
@@ -376,7 +377,7 @@ class KafkaSystemAdmin(
   private def getTopicsAndPartitionsByBroker(metadata: Map[String, 
TopicMetadata]) = {
     val brokersToTopicPartitions = metadata
       .values
-      // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] 
+      // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)]
       .flatMap(topicMetadata => {
         KafkaUtil.maybeThrowException(topicMetadata.errorCode)
         topicMetadata

Reply via email to