[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343624#comment-17343624
 ] 

Walker Carlson commented on KAFKA-12774:
----------------------------------------

I looked at the code and it seems that in the replace thread path we don't 
actually log the exception unless its for the global thread, we just rethrow 
the exception to kill the thread. This is what we used to do so I am not sure 
why it is not logging correctly. We do for all other cases. 

The other paths we log the exception with some flavor text, we can add it to 
the replace thread option but probably we just want to add a log at the top of 
the handler with the error.

I suppose that we should also see if the same behavior with the shutdown client 
and application options.

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-12774
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12774
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Jørgen
>            Priority: Minor
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> <Appenders>
>     <Console name="Console" target="SYSTEM_OUT">
>         <JSONLayout complete="false" compact="true" eventEol="true" 
> stacktraceAsString="true" properties="true">
>             <KeyValuePair key="timestamp" 
> value="$${date:yyyy-MM-dd'T'HH:mm:ss.SSSZ}"/>
>         </JSONLayout>
>     </Console>
> </Appenders> {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
>     logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
>     
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>        at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
>     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>      at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>      at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)    
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to