Repository: samza Updated Branches: refs/heads/master e3f85871c -> 5a88b9e47
SAMZA-1112; BrokerProxy does not log fatal errors Add an UncaughtExceptionHandler to the broker proxy thread so failures there get logged. Author: Tommy Becker <tobec...@tivo.com> Reviewers: Jagadish <jagad...@apache.org> Closes #80 from twbecker/SAMZA-1112 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5a88b9e4 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5a88b9e4 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5a88b9e4 Branch: refs/heads/master Commit: 5a88b9e47cd9b2a2aba742ff4fe8eeefb7a87e92 Parents: e3f8587 Author: Tommy Becker <tobec...@tivo.com> Authored: Sat Mar 11 06:56:48 2017 -0800 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Sat Mar 11 06:56:48 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/job/local/ThreadJob.scala | 4 +++- .../apache/samza/system/kafka/BrokerProxy.scala | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5a88b9e4/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala index 63754c5..e0522b1 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala @@ -19,6 +19,8 @@ package org.apache.samza.job.local +import java.lang.Thread.UncaughtExceptionHandler + import org.apache.samza.util.Logging import org.apache.samza.job.StreamJob import org.apache.samza.job.ApplicationStatus @@ -42,7 +44,7 @@ class ThreadJob(runnable: Runnable) extends StreamJob with Logging { runnable.run jobStatus = Some(SuccessfulFinish) } catch { - case e: Exception => { + case e: Throwable => { error("Failing job with exception.", e) jobStatus = Some(UnsuccessfulFinish) throw e http://git-wip-us.apache.org/repos/asf/samza/blob/5a88b9e4/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index cbb8881..539a439 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -21,17 +21,20 @@ package org.apache.samza.system.kafka +import java.lang.Thread.UncaughtExceptionHandler import java.nio.channels.ClosedByInterruptException import java.util.Map.Entry import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} + import kafka.api._ -import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition} +import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException} import kafka.consumer.ConsumerConfig import kafka.message.MessageSet import org.apache.samza.SamzaException import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.Logging import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + import scala.collection.JavaConversions._ import scala.collection.concurrent import scala.collection.mutable @@ -198,8 +201,8 @@ class BrokerProxy( } /** - * Releases ownership for a single TopicAndPartition. The - * KafkaSystemConsumer will try and find a new broker for the + * Releases ownership for a single TopicAndPartition. The + * KafkaSystemConsumer will try and find a new broker for the * TopicAndPartition. */ def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match { @@ -209,8 +212,8 @@ class BrokerProxy( } /** - * Releases all TopicAndPartition ownership for this BrokerProxy thread. The - * KafkaSystemConsumer will try and find a new broker for the + * Releases all TopicAndPartition ownership for this BrokerProxy thread. The + * KafkaSystemConsumer will try and find a new broker for the * TopicAndPartition. */ def abdicateAll { @@ -295,6 +298,9 @@ class BrokerProxy( info("Starting " + toString) thread.setDaemon(true) thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName) + thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { + override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e) + }) thread.start } else { debug("Tried to start an already started broker proxy (%s). Ignoring." format toString) @@ -330,4 +336,4 @@ class BrokerProxy( } } } -} \ No newline at end of file +}