Hello Matthias
Thanks for looking into this.
A restart has worked, I can confirm that.
Before this problem happened we had some cluster issues which are still
being looked into, there were some leader changes and some offset commit
failures.
The consumer should not have lagged that much behind, but I can only check
that at the next occurrence.

Does the user have any other solution available than to restart?
I understand the intention to "notify" the user of a potential problem, but
if nothing can be changed about the data loss then a warning message and
automatic recovery should not make things worse.
This would make sense as an improvement, as I understand this is not a bug
the case is closed for me at the moment.

Thanks again and best regards
Patrik

On Thu, 4 Oct 2018 at 02:58, Matthias J. Sax <matth...@confluent.io> wrote:

> I double checked the code and discussed with a colleague.
>
> There are two places when we call `globalConsumer.poll()`
>
> 1. On startup, when we need to bootstrap the store. In this case, we
> catch the exception and handle it.
> 2. During regular processing. In this case, we don't catch the exception.
>
> The reasoning is the following: For case (1) the exception should only
> happen if you start a new application of if an application was offline
> for a long time. This is fine and we just make sure to bootstrap
> correctly. For case (2) the consumer is at the end of the log and thus,
> an InvalidOffsetException should never occur but indicate an issue the
> user should be notified about.
>
> Does this reasoning make sense?
>
> Question: if you restart your application, does it fail again? Or does
> it resume processing?
>
> It would be good to understand the root cause. It seems, you
> globalConsumer is lagging behind? Can you verify this? If yes, it seems
> to make sense to stop processing to inform the user about this issue.
> Would you rather prefer the application to just move on implying silent
> data loss??
>
>
> -Matthias
>
>
> On 10/3/18 12:20 AM, Patrik Kleindl wrote:
> > Hello Matthias
> > Thank you for the explanation.
> >
> > Version used is 2.0.0-cp1
> >
> > The stacktrace:
> > 2018-10-02 10:51:52,575 ERROR
> >
> [org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer]
> > (...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread) -
> > [short-component-name:; transaction-id:; user-id:; creation-time:]
> > global-stream-thread
> > [...-077dce27-40fe-47bd-86dd-1615395782af-GlobalStreamThread] Updating
> > global state failed. You can restart KafkaStreams to recover from this
> > error.: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> > Offsets out of range with no configured reset policy for partitions:
> > {...=51247974}
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
> > at
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
> > at
> >
> org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
> > at
> >
> org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290)
> >
> > Fetcher.parseCompletedFetch:
> >
> > else if (error == Errors.OFFSET_OUT_OF_RANGE) {
> >                 if (fetchOffset != subscriptions.position(tp)) {
> >                     log.debug("Discarding stale fetch response for
> > partition {} since the fetched offset {} " +
> >                             "does not match the current offset {}", tp,
> > fetchOffset, subscriptions.position(tp));
> >                 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
> >                     log.info("Fetch offset {} is out of range for
> partition
> > {}, resetting offset", fetchOffset, tp);
> >                     subscriptions.requestOffsetReset(tp);
> >                 } else {
> >                     throw new
> > OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
> >                 }
> >
> > So this means that for global/restore the exception will always be thrown
> > without some special handling?
> >
> > best regards
> >
> > Patrik
> >
> > On Tue, 2 Oct 2018 at 22:26, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >
> >> It is by design to set the reset policy to "none"
> >> (https://issues.apache.org/jira/browse/KAFKA-6121), and not allowed by
> >> design to overwrite this (there might be a workaround for you though).
> >> However, Streams should not die but catch the exception and recover from
> >> it automatically.
> >>
> >> What version do you use? Can you share the full stack trace to see why
> >> Streams failed to recover from this exception?
> >>
> >>
> >> -Matthias
> >>
> >> On 10/2/18 4:54 AM, Patrik Kleindl wrote:
> >>> Hi
> >>>
> >>> We had several incidents where a streams application crashed while
> >>> maintaining a global state store.
> >>> Updating global state failed. You can restart KafkaStreams to recover
> >> from
> >>> this error.:
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> >>> Offsets out of range with no configured reset policy for partitions:
> ...
> >>>
> >>> As we never set this to none I checked the code and found that
> >>> StreamsConfig getGlobalConsumerConfigs and getRestoreConsumerConfigs
> both
> >>> set this explicitely:
> >>> baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
> >>>
> >>> The logs confirms this:
> >>> 2018-10-02 11:07:06,057 INFO
> >> [org.apache.kafka.common.utils.AppInfoParser]
> >>> (ServerService Thread Pool -- 70) - [short-component-name:;
> >>> transaction-id:; user-id:; creation-time:]  Kafka version : 2.0.0-cp1
> >>> 2018-10-02 11:07:06,057 INFO
> >> [org.apache.kafka.common.utils.AppInfoParser]
> >>> (ServerService Thread Pool -- 70) - [short-component-name:;
> >>> transaction-id:; user-id:; creation-time:]  Kafka commitId :
> >>> a8c648ff08b9235d
> >>> 2018-10-02 11:07:06,104 INFO
> >>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService
> Thread
> >>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> >>> creation-time:]  ConsumerConfig values:
> >>> auto.commit.interval.ms = 5000
> >>> auto.offset.reset = none
> >>> bootstrap.servers = [...]
> >>> check.crcs = true
> >>> client.id = ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-global-consumer
> >>>
> >>> ...
> >>>
> >>> 2018-10-02 11:07:06,418 INFO
> >>> [org.apache.kafka.streams.processor.internals.StreamThread]
> >> (ServerService
> >>> Thread Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> >>> creation-time:]  stream-thread
> >>> [...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1] Creating
> >> restore
> >>> consumer client
> >>> 2018-10-02 11:07:06,419 INFO
> >>> [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService
> Thread
> >>> Pool -- 72) - [short-component-name:; transaction-id:; user-id:;
> >>> creation-time:]  ConsumerConfig values:
> >>> auto.commit.interval.ms = 5000
> >>> auto.offset.reset = none
> >>> bootstrap.servers = [...]
> >>> check.crcs = true
> >>> client.id =
> >>>
> ...-3f809a8a-3915-4ae4-9a37-f7a392e3dff3-StreamThread-1-restore-consumer
> >>>
> >>> Is this intentional and if yes, why can this not use the default policy
> >> and
> >>> recover?
> >>>
> >>> best regards
> >>>
> >>> Patrik
> >>>
> >>
> >>
> >
>
>

Reply via email to