SAMZA-592; ignore replica not available exceptions, and be more aggressive about refreshing kafka topic metadata
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1202df41 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1202df41 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1202df41 Branch: refs/heads/samza-sql Commit: 1202df4171f44a877997301218fe3c3037abf2d5 Parents: 23c4b39 Author: Chris Riccomini <[email protected]> Authored: Fri Mar 13 14:52:03 2015 -0700 Committer: Chris Riccomini <[email protected]> Committed: Fri Mar 13 14:52:03 2015 -0700 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointManager.scala | 7 ++-- .../apache/samza/system/kafka/BrokerProxy.scala | 3 +- .../apache/samza/system/kafka/GetOffset.scala | 7 ++-- .../samza/system/kafka/KafkaSystemAdmin.scala | 10 ++--- .../samza/system/kafka/TopicMetadataCache.scala | 12 +++++- .../scala/org/apache/samza/util/KafkaUtil.scala | 39 ++++++++++++++++---- .../system/kafka/TestKafkaSystemAdmin.scala | 8 ++-- .../system/kafka/TestTopicMetadataCache.scala | 14 ++++++- .../org/apache/samza/utils/TestKafkaUtil.scala | 33 +++++++++++++++++ .../test/integration/TestStatefulTask.scala | 8 +--- 10 files changed, 107 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 4a1b31f..c9504ec 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -44,6 +44,7 @@ import org.apache.samza.util.TopicMetadataStore import scala.collection.mutable import java.util.Properties import org.apache.kafka.clients.producer.{Producer, ProducerRecord} +import org.apache.samza.util.KafkaUtil /** * Kafka checkpoint manager is used to store checkpoints in a Kafka topic. @@ -159,7 +160,7 @@ class KafkaCheckpointManager( .get(topicAndPartition) .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic)) // Fail or retry if there was an an issue with the offset request. - ErrorMapping.maybeThrowException(offsetResponse.error) + KafkaUtil.maybeThrowException(offsetResponse.error) val offset: Long = offsetResponse .offsets @@ -287,7 +288,7 @@ class KafkaCheckpointManager( warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic)) return } - ErrorMapping.maybeThrowException(errorCode) + KafkaUtil.maybeThrowException(errorCode) } for (response <- fetchResponse.messageSet(checkpointTopic, 0)) { @@ -385,7 +386,7 @@ class KafkaCheckpointManager( loop => { val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo) val topicMetadata = topicMetadataMap(checkpointTopic) - ErrorMapping.maybeThrowException(topicMetadata.errorCode) + KafkaUtil.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length if (partitionCount != 1) { http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index c6e231a..614f33f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -34,6 +34,7 @@ import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX import scala.collection.JavaConversions._ import scala.collection.concurrent import scala.collection.mutable +import org.apache.samza.util.KafkaUtil /** * Companion object for class JvmMetrics encapsulating various constants @@ -235,7 +236,7 @@ class BrokerProxy( // handle the recoverable errors. remainingErrors.foreach(e => { warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(",")) - ErrorMapping.maybeThrowException(e.code) }) + KafkaUtil.maybeThrowException(e.code) }) notLeaderOrUnknownTopic.foreach(e => abdicate(e.tp)) http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala index 147aabc..5528702 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala @@ -21,12 +21,13 @@ package org.apache.samza.system.kafka -import kafka.common.{ OffsetOutOfRangeException, ErrorMapping } +import kafka.common.OffsetOutOfRangeException import kafka.api._ import kafka.common.TopicAndPartition import kafka.api.PartitionOffsetRequestInfo import org.apache.samza.util.Logging import kafka.message.MessageAndOffset +import org.apache.samza.util.KafkaUtil /** * GetOffset validates offsets for topic partitions, and manages fetching new @@ -59,7 +60,7 @@ class GetOffset( val messages = consumer.defaultFetch((topicAndPartition, offset.toLong)) if (messages.hasError) { - ErrorMapping.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition)) + KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic, topicAndPartition.partition)) } info("Able to successfully read from offset %s for topic and partition %s. Using it to instantiate consumer." format (offset, topicAndPartition)) @@ -85,7 +86,7 @@ class GetOffset( .get(topicAndPartition) .getOrElse(toss("Unable to find offset information for %s" format topicAndPartition)) - ErrorMapping.maybeThrowException(partitionOffsetResponse.error) + KafkaUtil.maybeThrowException(partitionOffsetResponse.error) partitionOffsetResponse .offsets http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index b790be1..f783c57 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -28,12 +28,13 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging} import kafka.api._ import kafka.consumer.SimpleConsumer -import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping} +import kafka.common.{TopicExistsException, TopicAndPartition} import java.util.{Properties, UUID} import scala.collection.JavaConversions._ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import kafka.consumer.ConsumerConfig import kafka.admin.AdminUtils +import org.apache.samza.util.KafkaUtil object KafkaSystemAdmin extends Logging { /** @@ -222,12 +223,11 @@ class KafkaSystemAdmin( .values // Convert the topic metadata to a Seq[(Broker, TopicAndPartition)] .flatMap(topicMetadata => { - ErrorMapping.maybeThrowException(topicMetadata.errorCode) + KafkaUtil.maybeThrowException(topicMetadata.errorCode) topicMetadata .partitionsMetadata // Convert Seq[PartitionMetadata] to Seq[(Broker, TopicAndPartition)] .map(partitionMetadata => { - ErrorMapping.maybeThrowException(partitionMetadata.errorCode) val topicAndPartition = new TopicAndPartition(topicMetadata.topic, partitionMetadata.partitionId) val leader = partitionMetadata .leader @@ -263,7 +263,7 @@ class KafkaSystemAdmin( .getOffsetsBefore(new OffsetRequest(partitionOffsetInfo)) .partitionErrorAndOffsets .mapValues(partitionErrorAndOffset => { - ErrorMapping.maybeThrowException(partitionErrorAndOffset.error) + KafkaUtil.maybeThrowException(partitionErrorAndOffset.error) partitionErrorAndOffset.offsets.head }) @@ -320,7 +320,7 @@ class KafkaSystemAdmin( val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo) val topicMetadata = topicMetadataMap(topicName) - ErrorMapping.maybeThrowException(topicMetadata.errorCode) + KafkaUtil.maybeThrowException(topicMetadata.errorCode) val partitionCount = topicMetadata.partitionsMetadata.length if (partitionCount < numKafkaChangelogPartitions) { http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala index 4a49d22..82ecf1a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/TopicMetadataCache.scala @@ -21,7 +21,7 @@ package org.apache.samza.system.kafka import org.apache.samza.util.Logging import kafka.api.TopicMetadata -import kafka.common.ErrorMapping +import org.apache.samza.util.KafkaUtil /** * TopicMetadataCache is used to cache all the topic metadata for Kafka per @@ -43,7 +43,7 @@ object TopicMetadataCache extends Logging { val missingTopics = topics.filter(topic => !topicMetadataMap.contains(systemName, topic)) val topicsWithBadOrExpiredMetadata = (topics -- missingTopics).filter(topic => { val metadata = topicMetadataMap(systemName, topic) - metadata.streamMetadata.errorCode != ErrorMapping.NoError || ((time - metadata.lastRefreshMs) > cacheTimeout) + hasBadErrorCode(metadata.streamMetadata) || ((time - metadata.lastRefreshMs) > cacheTimeout) }) val topicsToRefresh = missingTopics ++ topicsWithBadOrExpiredMetadata @@ -67,4 +67,12 @@ object TopicMetadataCache extends Logging { def clear { topicMetadataMap.clear } + + /** + * Helper method to check if a topic's metadata has a bad errorCode, or if a + * partition's metadata has a bad errorCode. + */ + def hasBadErrorCode(streamMetadata: TopicMetadata) = { + KafkaUtil.isBadErrorCode(streamMetadata.errorCode) || streamMetadata.partitionsMetadata.exists(partitionMetadata => KafkaUtil.isBadErrorCode(partitionMetadata.errorCode)) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index 2482f23..a7a095b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -19,15 +19,16 @@ package org.apache.samza.util -import org.apache.samza.config.{Config, ConfigException} +import java.util.concurrent.atomic.AtomicLong +import org.apache.kafka.common.PartitionInfo +import org.apache.samza.config.Config +import org.apache.samza.config.ConfigException import org.apache.samza.config.JobConfig.Config2Job -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import org.apache.samza.system.OutgoingMessageEnvelope -import org.apache.kafka.common.utils.Utils -import java.util.Random -import org.apache.kafka.common.PartitionInfo +import kafka.common.ErrorMapping +import kafka.common.ReplicaNotAvailableException -object KafkaUtil { +object KafkaUtil extends Logging { val counter = new AtomicLong(0) def getClientId(id: String, config: Config): String = getClientId( @@ -43,10 +44,34 @@ object KafkaUtil { System.currentTimeMillis, counter.getAndIncrement) - private def abs(n: Int) = if(n == Integer.MIN_VALUE) 0 else math.abs(n) + private def abs(n: Int) = if (n == Integer.MIN_VALUE) 0 else math.abs(n) def getIntegerPartitionKey(envelope: OutgoingMessageEnvelope, partitions: java.util.List[PartitionInfo]): Integer = { val numPartitions = partitions.size abs(envelope.getPartitionKey.hashCode()) % numPartitions } + + /** + * Exactly the same as Kafka's ErrorMapping.maybeThrowException + * implementation, except suppresses ReplicaNotAvailableException exceptions. + * According to the Kafka + * <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">protocol + * docs</a>, ReplicaNotAvailableException can be safely ignored. + */ + def maybeThrowException(code: Short) { + try { + ErrorMapping.maybeThrowException(code) + } catch { + case e: ReplicaNotAvailableException => + debug("Got ReplicaNotAvailableException, but ignoring since it's safe to do so.") + } + } + + /** + * Checks if a Kafka errorCode is "bad" or not. "Bad" is defined as any + * errorCode that's not NoError and also not ReplicaNotAvailableCode. + */ + def isBadErrorCode(code: Short) = { + code != ErrorMapping.NoError && code != ErrorMapping.ReplicaNotAvailableCode + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 3d1e6ec..0380d35 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -22,9 +22,7 @@ package org.apache.samza.system.kafka import java.util.Properties - import kafka.admin.AdminUtils -import kafka.common.ErrorMapping import kafka.consumer.Consumer import kafka.consumer.ConsumerConfig import kafka.server.KafkaConfig @@ -34,7 +32,6 @@ import kafka.utils.TestZKUtils import kafka.utils.Utils import kafka.utils.ZKStringSerializer import kafka.zk.EmbeddedZookeeper - import org.I0Itec.zkclient.ZkClient import org.apache.samza.Partition import org.apache.samza.system.SystemStreamMetadata @@ -45,11 +42,12 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore import org.apache.samza.util.TopicMetadataStore import org.junit.Assert._ import org.junit.{Test, BeforeClass, AfterClass} - import scala.collection.JavaConversions._ import org.apache.samza.config.KafkaProducerConfig import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer} import java.util +import kafka.common.ErrorMapping +import org.apache.samza.util.KafkaUtil object TestKafkaSystemAdmin { val TOPIC = "input" @@ -113,7 +111,7 @@ object TestKafkaSystemAdmin { val topicMetadata = topicMetadataMap(TOPIC) val errorCode = topicMetadata.errorCode - ErrorMapping.maybeThrowException(errorCode) + KafkaUtil.maybeThrowException(errorCode) done = expectedPartitionCount == topicMetadata.partitionsMetadata.size } catch { http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala index e698d2f..50c13ab 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestTopicMetadataCache.scala @@ -21,15 +21,15 @@ package org.apache.samza.system.kafka import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } - import kafka.api.TopicMetadata - import org.I0Itec.zkclient.ZkClient import org.apache.samza.util.Clock import org.apache.samza.util.TopicMetadataStore import org.junit.Assert._ import org.junit.Before import org.junit.Test +import kafka.common.ErrorMapping +import kafka.api.PartitionMetadata class TestTopicMetadataCache { @@ -124,4 +124,14 @@ class TestTopicMetadataCache { assertTrue(numAssertionSuccess.get()) assertEquals(1, mockStore.numberOfCalls.get()) } + + @Test + def testBadErrorCodes { + val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.LeaderNotAvailableCode) + val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), errorCode = ErrorMapping.NoError) + assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.RequestTimedOutCode))) + assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), ErrorMapping.NoError))) + assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, ErrorMapping.NoError))) + assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), ErrorMapping.NoError))) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala new file mode 100644 index 0000000..848cfc8 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/utils/TestKafkaUtil.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.utils + +import org.junit.Test +import org.scalatest.{ Matchers => ScalaTestMatchers } +import org.apache.samza.util.KafkaUtil +import kafka.common.ErrorMapping + +class TestKafkaUtil extends ScalaTestMatchers { + @Test + def testMaybeThrowException { + intercept[Exception] { KafkaUtil.maybeThrowException(ErrorMapping.UnknownTopicOrPartitionCode) } + KafkaUtil.maybeThrowException(ErrorMapping.ReplicaNotAvailableCode) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/1202df41/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index a8b724b..d66b3bd 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -22,9 +22,7 @@ package org.apache.samza.test.integration import java.util.Properties import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit - import kafka.admin.AdminUtils -import kafka.common.ErrorMapping import kafka.consumer.Consumer import kafka.consumer.ConsumerConfig import kafka.message.MessageAndMetadata @@ -35,7 +33,6 @@ import kafka.utils.TestZKUtils import kafka.utils.Utils import kafka.utils.ZKStringSerializer import kafka.zk.EmbeddedZookeeper - import org.I0Itec.zkclient.ZkClient import org.apache.samza.Partition import org.apache.samza.checkpoint.Checkpoint @@ -59,14 +56,13 @@ import org.apache.samza.util.ClientUtilTopicMetadataStore import org.apache.samza.util.TopicMetadataStore import org.junit.Assert._ import org.junit.{BeforeClass, AfterClass, Test} - import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.mutable.SynchronizedMap import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer} import java.util - +import org.apache.samza.util.KafkaUtil object TestStatefulTask { val INPUT_TOPIC = "input" @@ -145,7 +141,7 @@ object TestStatefulTask { val topicMetadata = topicMetadataMap(topic) val errorCode = topicMetadata.errorCode - ErrorMapping.maybeThrowException(errorCode) + KafkaUtil.maybeThrowException(errorCode) }) done = true
