It seems the above issue is coming because of KIP-35 <https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation> & its related PR <https://github.com/apache/kafka/pull/2328> (KAFKA-3264 <https://issues.apache.org/jira/browse/KAFKA-3264>). Filed https://issues.apache.org/jira/browse/SAMZA-1822 for this.
On Fri, Aug 24, 2018 at 2:39 PM Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > Hi All, > > By patching the samza codebase locally that this error goes away: > Patch involves changing the import for OffsetOutOfRangeException class in > file > > *samza/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala* > to *import org.apache.kafka.common.errors.OffsetOutOfRangeException* > > Can you please confirm if this change is good? And if so can a quick patch > release with it be made available? > > Independently, does this release needs to be verified for any more such > similar errors (possibly due to change in class packages etc.)? Not trying > to cast aspersions on this release, but just trying to ask the next thing > that naturally comes to mind :-) > > -- > thanks, > gaurav > > > On Thu, Aug 23, 2018 at 7:06 PM Gaurav Agarwal <gauravagarw...@gmail.com> > wrote: > > > Few more notes (based on reading a similar thread from few days ago): > > - this exception is while initializing offset for the data topic > partition > > (not samza's checkpoint topic/partition) > > - we have manually verified that due to some issue, kafka data-logs have > > rolled over and the earliest available offset is greater than what samza > > has in its checkpoint - and hence when samza is querying kafka with the > > offset it checkpointed last, it is seeing this error. > > > > Please let me know if more logs are required. > > > > > > On Thu, Aug 23, 2018 at 4:30 PM Gaurav Agarwal <gauravagarw...@gmail.com > > > > wrote: > > > >> Hi All, > >> > >> We are facing identical problem as described in thread > >> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html > >> > >> Here - Samza is requesting for an Kafka partition offset that is too old > >> (i.e Kafka log has moved ahead). We are setting the property > *consumer.auto.offset.reset > >> to smallest* and therefore expecting that Samza will reset its > >> checkpoint to earliest available partition offset in such a scenario. > But > >> that is not happening we are getting exceptions of this form > continually: > >> > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658] > >> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] - > >> Disconnecting from vrni-platform-release:9092 > >> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658] > >> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating > offset > >> 56443499 for topic and partition Topic3-0 > >> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658] > >> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - > While > >> refreshing brokers for Topic3-0: > >> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested > >> offset is not within the range of offsets maintained by the server.. > >> Retrying. > >> > >> *Version Details:* > >> > >> *Samza: 2.11-0.14.1* > >> *Kafka Client: 1.1.0 * > >> *Kafka Server: 1.1.0 Scala 2.11 * > >> > >> > >> Browsing through the code, it appears that GetOffset::isValidOffset > >> should be able to catch the exception OffsetOutOfRangeException and > >> convert it to a false value. But it appears that this not happening. > Could > >> there be a mismatch in package of the Exception? This class is catching > the > >> exception import kafka.common.OffsetOutOfRangeException, but from logs, > >> it appears that the package of this class different. Could this be the > >> reason? > >> > >> def isValidOffset(consumer: DefaultFetchSimpleConsumer, > >>> topicAndPartition: TopicAndPartition, offset: String) = { > >> > >> info("Validating offset %s for topic and partition %s" format > >>> (offset, topicAndPartition)) > >> > >> try { > >> > >> val messages = consumer.defaultFetch((topicAndPartition, > >>> offset.toLong)) > >> > >> if (messages.hasError) { > >> > >> > >>> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, > >>> topicAndPartition.partition).exception()) > >> > >> } > >> > >> info("Able to successfully read from offset %s for topic and > >>> partition %s. Using it to instantiate consumer." format (offset, > >>> topicAndPartition)) > >> > >> true > >> > >> } catch { > >> > >> case e: OffsetOutOfRangeException => false > >> > >> } > >> > >> } > >> > >> > >> Also, it Appears that BrokerProxy class - the caller of GetOffset would > >> print a log ("*It appears that...*") in case it gets a false value, but > >> it is not logging this line (indicating that some Exception generated in > >> GetOffset method is going uncaught and being propagated up): > >> > >> > >> def addTopicPartition(tp: TopicAndPartition, nextOffset: > >>> Option[String]) = { > >> > >> debug("Adding new topic and partition %s to queue for %s" format > (tp, > >>> host)) > >> > >> if (nextOffsets.asJava.containsKey(tp)) { > >> > >> toss("Already consuming TopicPartition %s" format tp) > >> > >> } > >> > >> val offset = if (nextOffset.isDefined && > >>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) { > >> > >> nextOffset > >> > >> .get > >> > >> .toLong > >> > >> } else { > >> > >> warn("It appears that we received an invalid or empty offset %s > for > >>> %s. Attempting to use Kafka's auto.offset.reset setting. This can > result in > >>> data loss if processing continues." format (nextOffset, tp)) > >> > >> offsetGetter.getResetOffset(simpleConsumer, tp) > >> > >> } > >> > >> debug("Got offset %s for new topic and partition %s." format > (offset, > >>> tp)) > >> > >> nextOffsets += tp -> offset > >> > >> metrics.topicPartitions.get((host, port)).set(nextOffsets.size) > >> > >> } > >> > >> > >> *Could this be due to mismatch in Kafka client library version that we > >> are using? Is there are commended Kafka client version we should use > with > >> Samza 0.14.1 (assuming that Kafka server is 1.x)?* > >> Any help regarding this will be greatly appreciated. > >> > >> > >> - - > >> thanks, > >> gaurav > >> > >> >