Updated Branches: refs/heads/trunk d2e2c607d -> d401292ab
KAFKA-1200 inconsistent log levels when consumed offset is reset patch by Dima Pekar reviewed by Joe Stein Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d401292a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d401292a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d401292a Branch: refs/heads/trunk Commit: d401292abb0f3660895173d4613f712058ae097f Parents: d2e2c60 Author: Joe Stein <[email protected]> Authored: Mon Jan 13 21:16:32 2014 -0500 Committer: Joe Stein <[email protected]> Committed: Mon Jan 13 21:16:32 2014 -0500 ---------------------------------------------------------------------- .../main/scala/kafka/server/AbstractFetcherThread.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d401292a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index bb2dd90..db7017b 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -97,7 +97,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } catch { case t: Throwable => if (isRunning.get) { - warn("Error in fetch %s".format(fetchRequest), t) + error("Error in fetch %s".format(fetchRequest), t) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } @@ -134,7 +134,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches - logger.warn("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) + logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) case e: Throwable => throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentOffset.get), e) @@ -143,16 +143,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke try { val newOffset = handleOffsetOutOfRange(topicAndPartition) partitionMap.put(topicAndPartition, newOffset) - warn("Current offset %d for partition [%s,%d] out of range; reset offset to %d" + error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { case e: Throwable => - warn("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) + error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } case _ => if (isRunning.get) { - warn("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition }
