It seems the above issue is coming because of KIP-35
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation>
&
its related PR <https://github.com/apache/kafka/pull/2328> (KAFKA-3264
<https://issues.apache.org/jira/browse/KAFKA-3264>). Filed
https://issues.apache.org/jira/browse/SAMZA-1822 for this.

On Fri, Aug 24, 2018 at 2:39 PM Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> Hi All,
>
> By patching the samza codebase locally that this error goes away:
> Patch involves changing the import for OffsetOutOfRangeException class in
> file
>
> *samza/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala*
> to *import org.apache.kafka.common.errors.OffsetOutOfRangeException*
>
> Can you please confirm if this change is good? And if so can a quick patch
> release with it be made available?
>
> Independently, does this release needs to be verified for any more such
> similar errors (possibly due to change in class packages etc.)? Not trying
> to cast aspersions on this release, but just trying to ask the next thing
> that naturally comes to mind :-)
>
> --
> thanks,
> gaurav
>
>
> On Thu, Aug 23, 2018 at 7:06 PM Gaurav Agarwal <gauravagarw...@gmail.com>
> wrote:
>
> > Few more notes (based on reading a similar thread from few days ago):
> > - this exception is while initializing offset for the data topic
> partition
> > (not samza's checkpoint topic/partition)
> > - we have manually verified that due to some issue, kafka data-logs have
> > rolled over and the earliest available offset is greater than what samza
> > has in its checkpoint - and hence when samza is querying kafka with the
> > offset it checkpointed last, it is seeing this error.
> >
> > Please let me know if more logs are required.
> >
> >
> > On Thu, Aug 23, 2018 at 4:30 PM Gaurav Agarwal <gauravagarw...@gmail.com
> >
> > wrote:
> >
> >> Hi All,
> >>
> >> We are facing identical problem as described in thread
> >> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html
> >>
> >> Here - Samza is requesting for an Kafka partition offset that is too old
> >> (i.e Kafka log has moved ahead). We are setting the property
> *consumer.auto.offset.reset
> >> to smallest* and therefore expecting that Samza will reset its
> >> checkpoint to earliest available partition offset in such a scenario.
> But
> >> that is not happening  we are getting exceptions of this form
> continually:
> >>
> >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> >> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
> >> Disconnecting from vrni-platform-release:9092
> >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> >> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating
> offset
> >> 56443499 for topic and partition Topic3-0
> >> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
> >> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] -
> While
> >> refreshing brokers for Topic3-0:
> >> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
> >> offset is not within the range of offsets maintained by the server..
> >> Retrying.
> >>
> >> *Version Details:*
> >>
> >> *Samza: 2.11-0.14.1*
> >> *Kafka Client: 1.1.0 *
> >> *Kafka Server: 1.1.0 Scala 2.11 *
> >>
> >>
> >> Browsing through the code, it appears that GetOffset::isValidOffset
> >> should be able to catch the exception OffsetOutOfRangeException and
> >> convert it to a false value. But it appears that this not happening.
> Could
> >> there be a mismatch in package of the Exception? This class is catching
> the
> >> exception import kafka.common.OffsetOutOfRangeException, but from logs,
> >> it appears that the package of this class different. Could this be the
> >> reason?
> >>
> >>  def isValidOffset(consumer: DefaultFetchSimpleConsumer,
> >>> topicAndPartition: TopicAndPartition, offset: String) = {
> >>
> >>     info("Validating offset %s for topic and partition %s" format
> >>> (offset, topicAndPartition))
> >>
> >>     try {
> >>
> >>       val messages = consumer.defaultFetch((topicAndPartition,
> >>> offset.toLong))
> >>
> >>       if (messages.hasError) {
> >>
> >>
> >>> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
> >>> topicAndPartition.partition).exception())
> >>
> >>       }
> >>
> >>       info("Able to successfully read from offset %s for topic and
> >>> partition %s. Using it to instantiate consumer." format (offset,
> >>> topicAndPartition))
> >>
> >>       true
> >>
> >>     } catch {
> >>
> >>       case e: OffsetOutOfRangeException => false
> >>
> >>     }
> >>
> >>   }
> >>
> >>
> >> Also, it Appears that BrokerProxy class - the caller of GetOffset would
> >> print a log ("*It appears that...*") in case it gets a false value, but
> >> it is not logging this line (indicating that some Exception generated in
> >> GetOffset method is going uncaught and being propagated up):
> >>
> >>
> >>   def addTopicPartition(tp: TopicAndPartition, nextOffset:
> >>> Option[String]) = {
> >>
> >>     debug("Adding new topic and partition %s to queue for %s" format
> (tp,
> >>> host))
> >>
> >>     if (nextOffsets.asJava.containsKey(tp)) {
> >>
> >>       toss("Already consuming TopicPartition %s" format tp)
> >>
> >>     }
> >>
> >>     val offset = if (nextOffset.isDefined &&
> >>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
> >>
> >>       nextOffset
> >>
> >>         .get
> >>
> >>         .toLong
> >>
> >>     } else {
> >>
> >>       warn("It appears that we received an invalid or empty offset %s
> for
> >>> %s. Attempting to use Kafka's auto.offset.reset setting. This can
> result in
> >>> data loss if processing continues." format (nextOffset, tp))
> >>
> >>       offsetGetter.getResetOffset(simpleConsumer, tp)
> >>
> >>     }
> >>
> >>     debug("Got offset %s for new topic and partition %s." format
> (offset,
> >>> tp))
> >>
> >>     nextOffsets += tp -> offset
> >>
> >>     metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
> >>
> >>   }
> >>
> >>
> >> *Could this be due to mismatch in Kafka client library version that we
> >> are using? Is there are commended Kafka client version we should use
> with
> >> Samza 0.14.1 (assuming that Kafka server is 1.x)?*
> >> Any help regarding this will be greatly appreciated.
> >>
> >>
> >> - -
> >> thanks,
> >> gaurav
> >>
> >>
>

Reply via email to