Repository: samza Updated Branches: refs/heads/master 317b6ff1b -> 61d35f26c
SAMZA-1028: Fix KafkaSystemProducer logging and use an AtomicReference for tracking producer exceptions Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/61d35f26 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/61d35f26 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/61d35f26 Branch: refs/heads/master Commit: 61d35f26c1f924f376b30f60d05c1957364c05e5 Parents: 317b6ff Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz> Authored: Thu Sep 29 14:51:23 2016 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Thu Sep 29 14:52:27 2016 -0700 ---------------------------------------------------------------------- .../system/kafka/KafkaSystemProducer.scala | 69 +++++++++++++------- 1 file changed, 47 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/61d35f26/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 5ff6d3c..aac53fc 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -20,8 +20,8 @@ package org.apache.samza.system.kafka -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Future} import org.apache.kafka.clients.producer.Callback import org.apache.kafka.clients.producer.Producer @@ -56,12 +56,13 @@ class KafkaSystemProducer(systemName: String, @volatile var latestFuture: Future[RecordMetadata] = null /** - * exceptionThrown: to store the exception in case of any "ultimate" send failure (ie. failure + * exceptionInCallback: to store the exception in case of any "ultimate" send failure (ie. failure * after exhausting max_retries in Kafka producer) in the I/O thread, we do not continue to queue up more send * requests from the samza thread. It helps the samza thread identify if the failure happened in I/O thread or not. + * + * In cases of multiple exceptions in the callbacks, we keep the first one before throwing. */ - @volatile - var exceptionThrown: SamzaException = null + var exceptionInCallback: AtomicReference[SamzaException] = new AtomicReference[SamzaException]() } @volatile var producer: Producer[Array[Byte], Array[Byte]] = null @@ -80,13 +81,13 @@ class KafkaSystemProducer(systemName: String, producer = null sources.foreach {p => - if (p._2.exceptionThrown == null) { + if (p._2.exceptionInCallback.get() == null) { flush(p._1) } } } } catch { - case e: Exception => logger.error(e.getMessage, e) + case e: Exception => error(e.getMessage, e) } } } @@ -97,6 +98,21 @@ class KafkaSystemProducer(systemName: String, } } + def closeAndNullifyCurrentProducer(currentProducer: Producer[Array[Byte], Array[Byte]]) { + try { + // TODO: we should use timeout close() to make sure we fail all waiting messages in kafka 0.9+ + currentProducer.close() + } catch { + case e: Exception => error("producer close failed", e) + } + producerLock.synchronized { + if (currentProducer == producer) { + // only nullify the member producer if it is still the same object, no point nullifying new producer + producer = null + } + } + } + def send(source: String, envelope: OutgoingMessageEnvelope) { trace("Enqueuing message: %s, %s." format (source, envelope)) @@ -110,10 +126,10 @@ class KafkaSystemProducer(systemName: String, throw new IllegalArgumentException("Source %s must be registered first before send." format source) } - val exception = sourceData.exceptionThrown + val exception = sourceData.exceptionInCallback.getAndSet(null) if (exception != null) { metrics.sendFailed.inc - throw exception + throw exception // in case the caller catches all exceptions and will try again } // lazy initialization of the producer @@ -134,8 +150,7 @@ class KafkaSystemProducer(systemName: String, // Java-based Kafka producer API requires an "Integer" type partitionKey and does not allow custom overriding of Partitioners // Any kind of custom partitioning has to be done on the client-side val partitions: java.util.List[PartitionInfo] = currentProducer.partitionsFor(topicName) - val partitionKey = if (envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) - else null + val partitionKey = if (envelope.getPartitionKey != null) KafkaUtil.getIntegerPartitionKey(envelope, partitions) else null val record = new ProducerRecord(envelope.getSystemStream.getStream, partitionKey, envelope.getKey.asInstanceOf[Array[Byte]], @@ -147,17 +162,23 @@ class KafkaSystemProducer(systemName: String, currentProducer.send(record, new Callback { def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { if (exception == null) { - //send was successful. Don't retry + //send was successful. metrics.sendSuccess.inc } else { - //If there is an exception in the callback, fail container! - //Close producer. - currentProducer.close - sourceData.exceptionThrown = new SamzaException("Unable to send message from %s to system %s." format(source, systemName), - exception) + error("Closing the producer because of an exception in callback: ", exception) + //If there is an exception in the callback, close producer. + closeAndNullifyCurrentProducer(currentProducer) + + // we keep the exception and will throw the exception in the next producer.send() + // so the user can handle the exception and decide to fail or ignore + sourceData.exceptionInCallback.compareAndSet( + null, + new SamzaException("Unable to send message from %s to system %s." format(source, systemName), + exception)) + metrics.sendFailed.inc - logger.error("Unable to send message on Topic:%s Partition:%s" format(topicName, partitionKey), + error("Unable to send message on Topic:%s Partition:%s" format(topicName, partitionKey), exception) } } @@ -167,7 +188,10 @@ class KafkaSystemProducer(systemName: String, metrics.sends.inc } catch { case e: Exception => { - currentProducer.close() + error("Closing the producer because of an exception in send: ", e) + + closeAndNullifyCurrentProducer(currentProducer) + metrics.sendFailed.inc throw new SamzaException(("Failed to send message on Topic:%s Partition:%s Exception:\n %s,") .format(topicName, partitionKey, e)) @@ -183,7 +207,7 @@ class KafkaSystemProducer(systemName: String, //if latestFuture is null, it probably means that there has been no calls to "send" messages //Hence, nothing to do in flush if(sourceData.latestFuture != null) { - while(!sourceData.latestFuture.isDone && sourceData.exceptionThrown == null) { + while(!sourceData.latestFuture.isDone && sourceData.exceptionInCallback.get() == null) { try { sourceData.latestFuture.get() } catch { @@ -191,9 +215,10 @@ class KafkaSystemProducer(systemName: String, } } - if (sourceData.exceptionThrown != null) { + //if there is an exception thrown from the previous callbacks just before flush, we have to fail the container + if (sourceData.exceptionInCallback.get() != null) { metrics.flushFailed.inc - throw sourceData.exceptionThrown + throw sourceData.exceptionInCallback.get() } else { trace("Flushed %s." format (source)) }