Would you mind filing a ticket for this? Would be nice to have it on the
books so other users know we know about it, and more importantly which
versions this is and is not fixed in.

Thanks for the report!

On Thu, Nov 18, 2021 at 6:10 AM John Roesler <vvcep...@apache.org> wrote:

> Thanks for pointing that out, Scott!
>
> You’re totally right; that should be a Throwable.
>
> Just to put it out there, do you want to just send a quick PR? If not, no
> worries. I’m just asking because it seems like you’ve already done the hard
> part and it might be nice to get the contribution credit.
>
> Thanks,
> John
>
> On Thu, Nov 18, 2021, at 08:00, Sinclair Scott wrote:
> > Hi there,
> >
> >
> > I'm a big fan of KStreams - thanks for all the great work!!
> >
> >
> > I unfortunately (my fault) had a StackOverflowError bug in my KStream
> > transformer which meant that the KStream died without reporting any
> > Exception at all.
> >
> >
> > The first log message showed some polling activity and then you see
> > later the State transition to PENDING_SHUTDOWN
> >
> >
> > Main Consumer poll completed in 2 ms and fetched 1 records
> > Flushing all global globalStores registered in the state manager
> > Idempotently invoking restoration logic in state RUNNING
> > Finished restoring all changelogs []
> > Idempotent restore call done. Thread state has not changed.
> > Processing tasks with 1 iterations.
> > Flushing all global globalStores registered in the state manager
> > State transition from RUNNING to PENDING_SHUTDOWN
> >
> >
> >
> > This is because the StreamThread.run() method catches Exception only.
> >
> >
> > I ended up recompiling the kstreams and changing the catch to Throwable
> > so I can see what was going on. Then I discovered my bad recursive call
> >  :(
> >
> >
> > Can we please change the Catch to catch Throwable , so that we are
> > always guaranteed some output?
> >
> >
> > StreamThread.java
> >
> > @Override
> > public void run() {
> >     log.info("Starting");
> >     if (setState(State.STARTING) == null) {
> >         log.info("StreamThread already shutdown. Not running");
> >         return;
> >     }
> >     boolean cleanRun = false;
> >     try {
> >         runLoop();
> >         cleanRun = true;
> >     } catch (final Exception e) {
> >         // we have caught all Kafka related exceptions, and other
> > runtime exceptions
> >         // should be due to user application errors
> >
> >         if (e instanceof UnsupportedVersionException) {
> >             final String errorMessage = e.getMessage();
> >             if (errorMessage != null &&
> >                 errorMessage.startsWith("Broker unexpectedly doesn't
> > support requireStable flag on version ")) {
> >
> >                 log.error("Shutting down because the Kafka cluster
> > seems to be on a too old version. " +
> >                     "Setting {}=\"{}\" requires broker version 2.5 or
> > higher.",
> >                     StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> >                     EXACTLY_ONCE_BETA);
> >
> >                 throw e;
> >             }
> >         }
> >
> >         log.error("Encountered the following exception during processing
> " +
> >             "and the thread is going to shut down: ", e);
> >         throw e;
> >     } finally {
> >         completeShutdown(cleanRun);
> >     }
> > }
> >
> >
> > Thanks and kind regards
> >
> >
> > Scott Sinclair
>

Reply via email to