[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352499#comment-17352499
 ] 

Almog Gavra commented on KAFKA-12774:
-------------------------------------

I went back to triage the severity of this because if things aren't getting 
logged via Log4j then our applications would be missing log-shipping, which is 
crticial in our production environments.

To confirm nothing was being logged outside of Log4J I hacked together 
something to try to reproduce this and I also added the following line any time 
a record was to be sent in RecordCollectorImpl to make sure I had exactly the 
same type of error that you encountered:
{code:java}
recordSendError(topic, new InvalidPidMappingException("foo"), serializedRecord);
{code}
I could not reproduce. The only things that were logged (notice that all Log4j 
loggers are turned OFF so only things that get logged to stdout get logged) 
were what I specifially logged to stdout. This was the stdout (I printed to 
stdout in the uncaught handler instead of logging):
{code:java}
SEEN: 0,0
(HERE) Uncaught exception handled - replacing thread Error encountered sending 
record to topic bar for task 0_0 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: foo
Exception handler choose to FAIL the processing, no more records would be sent.
{code}
We can leave this ticket open in case anyone else has any Ideas.
----
Here is the app:
{code:java}
public static void main(String[] args) throws InterruptedException, IOException 
{
  LogManager.getRootLogger().setLevel(Level.OFF);
  @SuppressWarnings("unchecked") Enumeration<Logger> loggers = 
LogManager.getCurrentLoggers();
  while (loggers.hasMoreElements()) {
    loggers.nextElement().setLevel(Level.OFF);
  }

  final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1);
  cluster.start();
  cluster.createTopic("foo");

  final Properties config = new Properties();
  config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
  config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 123);
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
  config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
  config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
  config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
  config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
  config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
  config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

  final CountDownLatch sawKey = new CountDownLatch(1);
  final StreamsBuilder builder = new StreamsBuilder();
  builder.stream("foo")
      .filter((k, v) -> k != null)
      .peek((k, v) -> System.out.println("SEEN: " + k + "," + v))
      .peek((k ,v) -> {
        if ((int) k == 0) sawKey.countDown();
      })
      .to("bar");

  final Topology build = builder.build(config);
  final KafkaStreams app = new KafkaStreams(build, config);

  app.setUncaughtExceptionHandler(exception -> {
        System.out.println("(HERE) Uncaught exception handled - replacing 
thread " + exception.getMessage());
        return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
      }
  );

  final CountDownLatch startLatch = new CountDownLatch(1);
  app.setStateListener((newState, oldState) -> {
    if (newState == State.RUNNING) {
      startLatch.countDown();
    }
  });
  app.start();
  startLatch.await();

  final Properties producerProps = new Properties();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);

  IntegrationTestUtils.produceKeyValuesSynchronously(
      "foo",
      IntStream.range(0, 1)
          .mapToObj(i -> KeyValue.pair(0, i))
          .collect(Collectors.toList()),
      producerProps,
      Time.SYSTEM);


  sawKey.await();
  app.close();
  app.cleanUp();
  cluster.after();
}{code}
 

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-12774
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12774
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Jørgen
>            Priority: Minor
>             Fix For: 3.0.0, 2.8.1
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> <Appenders>
>     <Console name="Console" target="SYSTEM_OUT">
>         <JSONLayout complete="false" compact="true" eventEol="true" 
> stacktraceAsString="true" properties="true">
>             <KeyValuePair key="timestamp" 
> value="$${date:yyyy-MM-dd'T'HH:mm:ss.SSSZ}"/>
>         </JSONLayout>
>     </Console>
> </Appenders> {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
>     logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
>     
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>        at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
>     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>      at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>       at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>      at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)    
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to