Hi devs,

I’m using a custom ConsumerRebalanceListener with a plain old Kafka consumer to 
manage offsets in an external storage as described in 
ConsumerRebalanceListener’s javadoc.

When that storage is not available, I’m throwing an exception from my listener. 
However, the exception is simply logged and ignored by the ConsumerCoordinator, 
as could be seen in these two code snippets from it:

        try {
            listener.onPartitionsRevoked(revoked);
        } catch (WakeupException | InterruptException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} failed on partition 
revocation", listener.getClass().getName(), e);
        }

and 

        try {
            listener.onPartitionsAssigned(assignedPartitions);
        } catch (WakeupException | InterruptException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} failed on partition 
assignment", listener.getClass().getName(), e);
        }

The consumption continues as if nothing has happened.

I wonder what is the reason for this behaviour, as it leaves no mean for a 
custom listener to signal a fatal condition and stop consumption.

Should this behaviour be changed to propagate the exception instead?

Cheers,
Oleg Muravskiy

Reply via email to