This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new a08a0ae  MINOR: Enable capture of full stack trace in 
StreamTask#process (#6310)
a08a0ae is described below

commit a08a0aec4bacb67229f39148f6cf5b8df31524bf
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Sat Feb 23 15:09:46 2019 -0500

    MINOR: Enable capture of full stack trace in StreamTask#process (#6310)
    
    Reviewers: Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java     | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6dbf394..ca33756 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -47,6 +47,8 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -374,12 +376,15 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         } catch (final ProducerFencedException fatal) {
             throw new TaskMigratedException(this, fatal);
         } catch (final KafkaException e) {
-            throw new StreamsException(format("Exception caught in process. 
taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d",
+            final String stackTrace = getStacktraceString(e);
+            throw new StreamsException(format("Exception caught in process. 
taskId=%s, " +
+                    "processor=%s, topic=%s, partition=%d, offset=%d, 
stacktrace=%s",
                 id(),
                 processorContext.currentNode().name(),
                 record.topic(),
                 record.partition(),
-                record.offset()
+                record.offset(),
+                stackTrace
             ), e);
         } finally {
             processorContext.setCurrentNode(null);
@@ -388,6 +393,18 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
         return true;
     }
 
+    private String getStacktraceString(final KafkaException e) {
+        String stacktrace = null;
+        try (final StringWriter stringWriter = new StringWriter();
+             final PrintWriter printWriter = new PrintWriter(stringWriter)) {
+            e.printStackTrace(printWriter);
+            stacktrace = stringWriter.toString();
+        } catch (final IOException ioe) {
+            log.error("Encountered error extracting stacktrace from this 
exception", ioe);
+        }
+        return stacktrace;
+    }
+
     /**
      * @throws IllegalStateException if the current node is not null
      * @throws TaskMigratedException if the task producer got fenced (EOS only)

Reply via email to