This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 12e8b8c MINOR: Enable capture of full stack trace in StreamTask#process (#6310) 12e8b8c is described below commit 12e8b8c2c7ddce09daa449d9843e9fa08cd538f1 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Sat Feb 23 15:09:46 2019 -0500 MINOR: Enable capture of full stack trace in StreamTask#process (#6310) Reviewers: Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../streams/processor/internals/StreamTask.java | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6dbf394..ca33756 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -47,6 +47,8 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -374,12 +376,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator } catch (final ProducerFencedException fatal) { throw new TaskMigratedException(this, fatal); } catch (final KafkaException e) { - throw new StreamsException(format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", + final String stackTrace = getStacktraceString(e); + throw new StreamsException(format("Exception caught in process. taskId=%s, " + + "processor=%s, topic=%s, partition=%d, offset=%d, stacktrace=%s", id(), processorContext.currentNode().name(), record.topic(), record.partition(), - record.offset() + record.offset(), + stackTrace ), e); } finally { processorContext.setCurrentNode(null); @@ -388,6 +393,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator return true; } + private String getStacktraceString(final KafkaException e) { + String stacktrace = null; + try (final StringWriter stringWriter = new StringWriter(); + final PrintWriter printWriter = new PrintWriter(stringWriter)) { + e.printStackTrace(printWriter); + stacktrace = stringWriter.toString(); + } catch (final IOException ioe) { + log.error("Encountered error extracting stacktrace from this exception", ioe); + } + return stacktrace; + } + /** * @throws IllegalStateException if the current node is not null * @throws TaskMigratedException if the task producer got fenced (EOS only)