Hi, Gaurav,

Thanks for working on the patch for the problem. Could you open a ticket
and PR for the change? The dev mailing list stripping off all attachments
and is hard to follow if the change is not embedded as text in the email.

And to Debraj's comment, yes, we are aware of it and are working on
removing the old consumer usage in Samza code base as the coming release (
https://issues.apache.org/jira/browse/SAMZA-1776).

Thank you all!

On Fri, Aug 24, 2018 at 4:33 AM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> It seems the above issue is coming because of KIP-35
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 109%3A+Old+Consumer+Deprecation>
> &
> its related PR <https://github.com/apache/kafka/pull/2328> (KAFKA-3264
> <https://issues.apache.org/jira/browse/KAFKA-3264>). Filed
> https://issues.apache.org/jira/browse/SAMZA-1822 for this.
>
> On Fri, Aug 24, 2018 at 2:39 PM Gaurav Agarwal <gauravagarw...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > By patching the samza codebase locally that this error goes away:
> > Patch involves changing the import for OffsetOutOfRangeException class in
> > file
> >
> > *samza/samza-kafka/src/main/scala/org/apache/samza/system/
> kafka/GetOffset.scala*
> > to *import org.apache.kafka.common.errors.OffsetOutOfRangeException*
> >
> > Can you please confirm if this change is good? And if so can a quick
> patch
> > release with it be made available?
> >
> > Independently, does this release needs to be verified for any more such
> > similar errors (possibly due to change in class packages etc.)? Not
> trying
> > to cast aspersions on this release, but just trying to ask the next thing
> > that naturally comes to mind :-)
> >
> > --
> > thanks,
> > gaurav
> >
> >
> > On Thu, Aug 23, 2018 at 7:06 PM Gaurav Agarwal <gauravagarw...@gmail.com
> >
> > wrote:
> >
> > > Few more notes (based on reading a similar thread from few days ago):
> > > - this exception is while initializing offset for the data topic
> > partition
> > > (not samza's checkpoint topic/partition)
> > > - we have manually verified that due to some issue, kafka data-logs
> have
> > > rolled over and the earliest available offset is greater than what
> samza
> > > has in its checkpoint - and hence when samza is querying kafka with the
> > > offset it checkpointed last, it is seeing this error.
> > >
> > > Please let me know if more logs are required.
> > >
> > >
> > > On Thu, Aug 23, 2018 at 4:30 PM Gaurav Agarwal <
> gauravagarw...@gmail.com
> > >
> > > wrote:
> > >
> > >> Hi All,
> > >>
> > >> We are facing identical problem as described in thread
> > >> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html
> > >>
> > >> Here - Samza is requesting for an Kafka partition offset that is too
> old
> > >> (i.e Kafka log has moved ahead). We are setting the property
> > *consumer.auto.offset.reset
> > >> to smallest* and therefore expecting that Samza will reset its
> > >> checkpoint to earliest available partition offset in such a scenario.
> > But
> > >> that is not happening  we are getting exceptions of this form
> > continually:
> > >>
> > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> > >> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
> > >> Disconnecting from vrni-platform-release:9092
> > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> > >> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating
> > offset
> > >> 56443499 for topic and partition Topic3-0
> > >> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
> > >> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] -
> > While
> > >> refreshing brokers for Topic3-0:
> > >> org.apache.kafka.common.errors.OffsetOutOfRangeException: The
> requested
> > >> offset is not within the range of offsets maintained by the server..
> > >> Retrying.
> > >>
> > >> *Version Details:*
> > >>
> > >> *Samza: 2.11-0.14.1*
> > >> *Kafka Client: 1.1.0 *
> > >> *Kafka Server: 1.1.0 Scala 2.11 *
> > >>
> > >>
> > >> Browsing through the code, it appears that GetOffset::isValidOffset
> > >> should be able to catch the exception OffsetOutOfRangeException and
> > >> convert it to a false value. But it appears that this not happening.
> > Could
> > >> there be a mismatch in package of the Exception? This class is
> catching
> > the
> > >> exception import kafka.common.OffsetOutOfRangeException, but from
> logs,
> > >> it appears that the package of this class different. Could this be the
> > >> reason?
> > >>
> > >>  def isValidOffset(consumer: DefaultFetchSimpleConsumer,
> > >>> topicAndPartition: TopicAndPartition, offset: String) = {
> > >>
> > >>     info("Validating offset %s for topic and partition %s" format
> > >>> (offset, topicAndPartition))
> > >>
> > >>     try {
> > >>
> > >>       val messages = consumer.defaultFetch((topicAndPartition,
> > >>> offset.toLong))
> > >>
> > >>       if (messages.hasError) {
> > >>
> > >>
> > >>> KafkaUtil.maybeThrowException(messages.error(
> topicAndPartition.topic,
> > >>> topicAndPartition.partition).exception())
> > >>
> > >>       }
> > >>
> > >>       info("Able to successfully read from offset %s for topic and
> > >>> partition %s. Using it to instantiate consumer." format (offset,
> > >>> topicAndPartition))
> > >>
> > >>       true
> > >>
> > >>     } catch {
> > >>
> > >>       case e: OffsetOutOfRangeException => false
> > >>
> > >>     }
> > >>
> > >>   }
> > >>
> > >>
> > >> Also, it Appears that BrokerProxy class - the caller of GetOffset
> would
> > >> print a log ("*It appears that...*") in case it gets a false value,
> but
> > >> it is not logging this line (indicating that some Exception generated
> in
> > >> GetOffset method is going uncaught and being propagated up):
> > >>
> > >>
> > >>   def addTopicPartition(tp: TopicAndPartition, nextOffset:
> > >>> Option[String]) = {
> > >>
> > >>     debug("Adding new topic and partition %s to queue for %s" format
> > (tp,
> > >>> host))
> > >>
> > >>     if (nextOffsets.asJava.containsKey(tp)) {
> > >>
> > >>       toss("Already consuming TopicPartition %s" format tp)
> > >>
> > >>     }
> > >>
> > >>     val offset = if (nextOffset.isDefined &&
> > >>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
> > >>
> > >>       nextOffset
> > >>
> > >>         .get
> > >>
> > >>         .toLong
> > >>
> > >>     } else {
> > >>
> > >>       warn("It appears that we received an invalid or empty offset %s
> > for
> > >>> %s. Attempting to use Kafka's auto.offset.reset setting. This can
> > result in
> > >>> data loss if processing continues." format (nextOffset, tp))
> > >>
> > >>       offsetGetter.getResetOffset(simpleConsumer, tp)
> > >>
> > >>     }
> > >>
> > >>     debug("Got offset %s for new topic and partition %s." format
> > (offset,
> > >>> tp))
> > >>
> > >>     nextOffsets += tp -> offset
> > >>
> > >>     metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
> > >>
> > >>   }
> > >>
> > >>
> > >> *Could this be due to mismatch in Kafka client library version that we
> > >> are using? Is there are commended Kafka client version we should use
> > with
> > >> Samza 0.14.1 (assuming that Kafka server is 1.x)?*
> > >> Any help regarding this will be greatly appreciated.
> > >>
> > >>
> > >> - -
> > >> thanks,
> > >> gaurav
> > >>
> > >>
> >
>

Reply via email to