Few more notes (based on reading a similar thread from few days ago):
- this exception is while initializing offset for the data topic partition
(not samza's checkpoint topic/partition)
- we have manually verified that due to some issue, kafka data-logs have
rolled over and the earliest available offset is greater than what samza
has in its checkpoint - and hence when samza is querying kafka with the
offset it checkpointed last, it is seeing this error.

Please let me know if more logs are required.

> Hi All,
> We are facing identical problem as described in thread
> https://www.mail-archive.com/dev@samza.apache.org/msg06740.html
> Here - Samza is requesting for an Kafka partition offset that is too old
> (i.e Kafka log has moved ahead). We are setting the property 
> *consumer.auto.offset.reset
> to smallest* and therefore expecting that Samza will reset its checkpoint
> to earliest available partition offset in such a scenario. But that is not
> happening  we are getting exceptions of this form continually:
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
> Disconnecting from vrni-platform-release:9092
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
> 56443499 for topic and partition Topic3-0
> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
> refreshing brokers for Topic3-0:
> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
> offset is not within the range of offsets maintained by the server..
> Retrying.
> *Version Details:*
> *Samza: 2.11-0.14.1*
> *Kafka Client: 1.1.0 *
> *Kafka Server: 1.1.0 Scala 2.11 *
> Browsing through the code, it appears that GetOffset::isValidOffset should
> be able to catch the exception OffsetOutOfRangeException and convert it
> to a false value. But it appears that this not happening. Could there be a
> mismatch in package of the Exception? This class is catching the
> exception import kafka.common.OffsetOutOfRangeException, but from logs,
> it appears that the package of this class different. Could this be the
> reason?
>  def isValidOffset(consumer: DefaultFetchSimpleConsumer,
>> topicAndPartition: TopicAndPartition, offset: String) = {
>     info("Validating offset %s for topic and partition %s" format (offset,
>> topicAndPartition))
>     try {
>       val messages = consumer.defaultFetch((topicAndPartition,
>> offset.toLong))
>       if (messages.hasError) {
>> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
>> topicAndPartition.partition).exception())
>       }
>       info("Able to successfully read from offset %s for topic and
>> partition %s. Using it to instantiate consumer." format (offset,
>> topicAndPartition))
>       true
>     } catch {
>       case e: OffsetOutOfRangeException => false
>     }
>   }
> Also, it Appears that BrokerProxy class - the caller of GetOffset would
> print a log ("*It appears that...*") in case it gets a false value, but
> it is not logging this line (indicating that some Exception generated in
> GetOffset method is going uncaught and being propagated up):
>   def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String])
>> = {
>     debug("Adding new topic and partition %s to queue for %s" format (tp,
>> host))
>     if (nextOffsets.asJava.containsKey(tp)) {
>       toss("Already consuming TopicPartition %s" format tp)
>     }
>     val offset = if (nextOffset.isDefined &&
>> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>       nextOffset
>         .get
>         .toLong
>     } else {
>       warn("It appears that we received an invalid or empty offset %s for
>> %s. Attempting to use Kafka's auto.offset.reset setting. This can result in
>> data loss if processing continues." format (nextOffset, tp))
>       offsetGetter.getResetOffset(simpleConsumer, tp)
>     }
>     debug("Got offset %s for new topic and partition %s." format (offset,
>> tp))
>     nextOffsets += tp -> offset
>     metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
>   }
> *Could this be due to mismatch in Kafka client library version that we are
> using? Is there are commended Kafka client version we should use with Samza
> 0.14.1 (assuming that Kafka server is 1.x)?*
> Any help regarding this will be greatly appreciated.
