Yes, I confirmed that in KAFKA-9331 (
https://github.com/apache/kafka/pull/9487), we removed the `catch
(exception)` block for adding uncaught exception handler.
Then, in KAFKA-12537(https://github.com/apache/kafka/pull/10387), we added
`catch (Thrwoable)` (in v2.8.0).

So, @Sinclair, if you upgrade to v2.8.0 or later, this issue should be
fixed.

Thank you.
Luke





On Sat, Nov 20, 2021 at 8:49 AM Matthias J. Sax <mj...@apache.org> wrote:

> Not sure what version you are using, but it say `Thrwoable` in `trunk`
>
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L577
>
>
> -Matthias
>
> On 11/18/21 6:09 AM, John Roesler wrote:
> > Thanks for pointing that out, Scott!
> >
> > You’re totally right; that should be a Throwable.
> >
> > Just to put it out there, do you want to just send a quick PR? If not,
> no worries. I’m just asking because it seems like you’ve already done the
> hard part and it might be nice to get the contribution credit.
> >
> > Thanks,
> > John
> >
> > On Thu, Nov 18, 2021, at 08:00, Sinclair Scott wrote:
> >> Hi there,
> >>
> >>
> >> I'm a big fan of KStreams - thanks for all the great work!!
> >>
> >>
> >> I unfortunately (my fault) had a StackOverflowError bug in my KStream
> >> transformer which meant that the KStream died without reporting any
> >> Exception at all.
> >>
> >>
> >> The first log message showed some polling activity and then you see
> >> later the State transition to PENDING_SHUTDOWN
> >>
> >>
> >> Main Consumer poll completed in 2 ms and fetched 1 records
> >> Flushing all global globalStores registered in the state manager
> >> Idempotently invoking restoration logic in state RUNNING
> >> Finished restoring all changelogs []
> >> Idempotent restore call done. Thread state has not changed.
> >> Processing tasks with 1 iterations.
> >> Flushing all global globalStores registered in the state manager
> >> State transition from RUNNING to PENDING_SHUTDOWN
> >>
> >>
> >>
> >> This is because the StreamThread.run() method catches Exception only.
> >>
> >>
> >> I ended up recompiling the kstreams and changing the catch to Throwable
> >> so I can see what was going on. Then I discovered my bad recursive call
> >>   :(
> >>
> >>
> >> Can we please change the Catch to catch Throwable , so that we are
> >> always guaranteed some output?
> >>
> >>
> >> StreamThread.java
> >>
> >> @Override
> >> public void run() {
> >>      log.info("Starting");
> >>      if (setState(State.STARTING) == null) {
> >>          log.info("StreamThread already shutdown. Not running");
> >>          return;
> >>      }
> >>      boolean cleanRun = false;
> >>      try {
> >>          runLoop();
> >>          cleanRun = true;
> >>      } catch (final Exception e) {
> >>          // we have caught all Kafka related exceptions, and other
> >> runtime exceptions
> >>          // should be due to user application errors
> >>
> >>          if (e instanceof UnsupportedVersionException) {
> >>              final String errorMessage = e.getMessage();
> >>              if (errorMessage != null &&
> >>                  errorMessage.startsWith("Broker unexpectedly doesn't
> >> support requireStable flag on version ")) {
> >>
> >>                  log.error("Shutting down because the Kafka cluster
> >> seems to be on a too old version. " +
> >>                      "Setting {}=\"{}\" requires broker version 2.5 or
> >> higher.",
> >>                      StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> >>                      EXACTLY_ONCE_BETA);
> >>
> >>                  throw e;
> >>              }
> >>          }
> >>
> >>          log.error("Encountered the following exception during
> processing " +
> >>              "and the thread is going to shut down: ", e);
> >>          throw e;
> >>      } finally {
> >>          completeShutdown(cleanRun);
> >>      }
> >> }
> >>
> >>
> >> Thanks and kind regards
> >>
> >>
> >> Scott Sinclair
>

Reply via email to