This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84b111f  KAFKA-12963: Add processor name to error (#11262)
84b111f is described below

commit 84b111f968a47c65d3498a1b3af42dd644403728
Author: Andy Lapidas <[email protected]>
AuthorDate: Fri Aug 27 22:06:49 2021 -0500

    KAFKA-12963: Add processor name to error (#11262)
    
    This PR adds the processor name to the ClassCastException exception text in 
process()
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../org/apache/kafka/streams/processor/internals/ProcessorNode.java    | 3 ++-
 .../apache/kafka/streams/processor/internals/ProcessorNodeTest.java    | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index dfc0b70..48c95f1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -147,7 +147,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
         } catch (final ClassCastException e) {
             final String keyClass = record.key() == null ? "unknown because 
key is null" : record.key().getClass().getName();
             final String valueClass = record.value() == null ? "unknown 
because value is null" : record.value().getClass().getName();
-            throw new StreamsException(String.format("ClassCastException 
invoking Processor. Do the Processor's "
+            throw new StreamsException(String.format("ClassCastException 
invoking processor: %s. Do the Processor's "
                     + "input types match the deserialized types? Check the 
Serde setup and change the default Serdes in "
                     + "StreamConfig or provide correct Serdes via method 
parameters. Make sure the Processor can accept "
                     + "the deserialized input of type key: %s, and value: 
%s.%n"
@@ -155,6 +155,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                     + "another cause (in user code, for example). For example, 
if a processor wires in a store, but casts "
                     + "the generics incorrectly, a class cast exception could 
be raised during processing, but the "
                     + "cause would not be wrong Serdes.",
+                    this.name(),
                     keyClass,
                     valueClass),
                 e);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 73147ed..87a4c68 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -193,7 +193,7 @@ public class ProcessorNodeTest {
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, "test-client", 
StreamsConfig.METRICS_LATEST, new MockTime());
         final InternalMockProcessorContext<Object, Object> context = new 
InternalMockProcessorContext<>(streamsMetrics);
-        final ProcessorNode<Object, Object, Object, Object> node = new 
ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+        final ProcessorNode<Object, Object, Object, Object> node = new 
ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
         node.init(context);
         final StreamsException se = assertThrows(
             StreamsException.class,
@@ -202,5 +202,6 @@ public class ProcessorNodeTest {
         assertThat(se.getCause(), instanceOf(ClassCastException.class));
         assertThat(se.getMessage(), containsString("default Serdes"));
         assertThat(se.getMessage(), containsString("input types"));
+        assertThat(se.getMessage(), containsString("pname"));
     }
 }

Reply via email to