Hi Gaurav (and others in this thread),

My apologies for the late reply, This e-mail appears to have missed my
inbox somehow.

>From your logs, It appears that there was a leadership change happening on
the Kafka side for this topic partition? If so, I would actually expect the
follower's offset to trail the leader's offset.

If you want to avoid this behavior, you can set *acks = -1* in your produce
request. That will ensure that the produce request succeeds iff. all
replicas in the ISR acknowledge the write. This will give you higher
durability guarantees at the cost of increased latency.

Hope that helps.

Best,
Jagadish

On Tue, May 2, 2017 at 11:44 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> Some more logs from Kakja
>
> WARN [2017-05-01 15:21:19,132]
> kafka.server.ReplicaFetcherThread:[Logging$class:warn:83] -
> [ReplicaFetcherThread-0-3] - [ReplicaFetcherThread-0-3], Replica 0 for
> partition [Topic3,17] reset its fetch offset from 45039137 to current
> leader 3's latest offset 45039132
>
>
> INFO [2017-05-01 15:21:19,150] kafka.log.Log:[Logging$class:info:68] -
> [ReplicaFetcherThread-0-3] - Truncating log Topic3-17 to offset 45039132.
>
>
> ERROR [2017-05-01 15:21:19,248]
> kafka.server.ReplicaFetcherThread:[Logging$class:error:97] -
> [ReplicaFetcherThread-0-3] - [ReplicaFetcherThread-0-3], Current offset
> 45039137 for partition [Topic3,17] out of range; reset offset to 45039132
>
>
> So it appears that Samza does not have an issue here. It was Kafka itself
> that went back on offsets, leading to an out-of-bounds offset query by
> Samza.
>
>
> We need to now dig a bit more into Kafka !
>
> On Tue, May 2, 2017 at 11:35 PM, Gaurav Agarwal <gauravagarw...@gmail.com>
> wrote:
>
> > Looking further, the reason for this "jump back" seems not so straight
> > forward:
> > In Kafka's Simple Consumer code:
> >
> > private def sendRequest(request: RequestOrResponse): NetworkReceive = {
> >   lock synchronized {
> >     var response: NetworkReceive = null
> >     try {
> >       getOrMakeConnection()
> >       blockingChannel.send(request)
> >       response = blockingChannel.receive()
> >     } catch {
> >       case e : ClosedByInterruptException =>
> >         throw e
> >       // Should not observe this exception when running Kafka with Java
> 1.8
> >       case e: AsynchronousCloseException =>
> >         throw e
> >       case e : Throwable =>
> >         info("Reconnect due to error:", e)
> >         // retry once
> >         try {
> >           reconnect()
> >           blockingChannel.send(request)
> >           response = blockingChannel.receive()
> >         } catch {
> >           case e: Throwable =>
> >             disconnect()
> >             throw e
> >         }
> >     }
> >     response
> >   }
> > }
> >
> >
> > Note that first exception is being printed (which is what is logged), but
> > the kafka client retries and throws back whatever exception it received.
> It
> > could be that this exception is an instance of
> OffsetOutOfRangeException that
> > caused Samza to thing that the offset is invalid.
> >
> > However, I am unable to understand what could have caused this? Did Kafka
> > return this exception accidentally or did Samza ask for the offset that
> was
> > beyond what was present in kafka queue?
> >
> >
> > On Tue, May 2, 2017 at 9:31 AM, Gaurav Agarwal <gauravagarw...@gmail.com
> >
> > wrote:
> >
> >> This also seems somewhat related to the mail on this group a few days
> >> back - with subject 'Messages lost after broker failure'.
> >>
> >> If someone had set auto.offset.reset to largest, then reverse would
> >> happen - i.e samza skipping over kafka partition queue in face of such
> >> failures.
> >>
> >> On Tue, May 2, 2017 at 9:17 AM, Gaurav Agarwal <
> gauravagarw...@gmail.com>
> >> wrote:
> >>
> >>> Hi All,
> >>>
> >>> We recently observed an issue in our Samza application (version
> >>> 0.12.0) where we found that that the message offsets "jumped back"
> causing
> >>> many of the messages to be re-processed.
> >>>
> >>> On digging deeper - here is what we found:
> >>>
> >>> -  There was a some network related issues at that time causing
> >>> temporary communication losses between nodes.
> >>> -  BrokerProxy tried to addTopicPartition() and while doing that, its
> >>> isValidOffset() method failed.
> >>>  - This caused it to read the reset the offset using the configured
> >>> setting - which we have set to auto.offset.reset=smallest
> >>>
> >>> Now, the validation check simply tries to fetch the message at the
> given
> >>> offset and if there is an exception while doing it, it assumes that the
> >>> offset is invalid. I think that the exception ReplicaNotAvailableE
> >>> xception is being considered harmless, but I am not sure what is the
> >>> expected behavior in case of any other exception. What about network
> >>> related exceptions (that are transient)?
> >>>
> >>> I am pasting below the logs that show a *simple network error,* and
> >>> immediately after that the message that Samza thinks that the offset
> was
> >>> invalid and hence it is falling-back to reset.
> >>>
> >>> Is there any workaround to get past this problem? I would have thought
> >>> that ideally, there could be only handful of well-known error codes
> that
> >>> would indicate that the offset into a kafka topic/partition is invalid.
> >>>
> >>> --
> >>> thanks,
> >>> gaurav
> >>>
> >>>
> >>> INFO [2017-05-01 15:19:27,327] [U:1,898,F:1,159,T:3,056,M:3,056]
> >>> system.kafka.DefaultFetchSimpleConsumer:[Logging$class:info:76] -
> >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> >>> client samza_consumer-job4-1] - Reconnect due to error:
> >>> java.io.EOFException
> >>>         at org.apache.kafka.common.network.NetworkReceive.
> readFromReada
> >>> bleChannel(NetworkReceive.java:83)
> >>>         at kafka.network.BlockingChannel.
> readCompletely(BlockingChannel
> >>> .scala:129)
> >>>         at kafka.network.BlockingChannel.
> receive(BlockingChannel.scala:
> >>> 120)
> >>>         at kafka.consumer.SimpleConsumer.
> liftedTree1$1(SimpleConsumer.s
> >>> cala:86)
> >>>         at kafka.consumer.SimpleConsumer.
> kafka$consumer$SimpleConsumer$
> >>> $sendRequest(SimpleConsumer.scala:83)
> >>>         at kafka.consumer.SimpleConsumer$
> $anonfun$fetch$1$$anonfun$appl
> >>> y$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
> >>>         at kafka.consumer.SimpleConsumer$
> $anonfun$fetch$1$$anonfun$appl
> >>> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
> >>>         at kafka.consumer.SimpleConsumer$
> $anonfun$fetch$1$$anonfun$appl
> >>> y$mcV$sp$1.apply(SimpleConsumer.scala:132)
> >>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>>         at kafka.consumer.SimpleConsumer$
> $anonfun$fetch$1.apply$mcV$sp(
> >>> SimpleConsumer.scala:131)
> >>>         at kafka.consumer.SimpleConsumer$
> $anonfun$fetch$1.apply(SimpleC
> >>> onsumer.scala:131)
> >>>         at kafka.consumer.SimpleConsumer$
> $anonfun$fetch$1.apply(SimpleC
> >>> onsumer.scala:131)
> >>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:
> 130)
> >>>         at org.apache.samza.system.kafka.
> DefaultFetchSimpleConsumer.fet
> >>> ch(DefaultFetchSimpleConsumer.scala:48)
> >>>         at org.apache.samza.system.kafka.
> DefaultFetchSimpleConsumer.def
> >>> aultFetch(DefaultFetchSimpleConsumer.scala:41)
> >>>         at org.apache.samza.system.kafka.
> GetOffset.isValidOffset(GetOff
> >>> set.scala:60)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy.addTopicPartition(
> >>> BrokerProxy.scala:99)
> >>>         at org.apache.samza.system.kafka.
> KafkaSystemConsumer$$anonfun$r
> >>> efreshBrokers$2.refresh$1(KafkaSystemConsumer.scala:213)
> >>>         at org.apache.samza.system.kafka.
> KafkaSystemConsumer$$anonfun$r
> >>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:226)
> >>>         at org.apache.samza.system.kafka.
> KafkaSystemConsumer$$anonfun$r
> >>> efreshBrokers$2.apply(KafkaSystemConsumer.scala:192)
> >>>         at org.apache.samza.util.ExponentialSleepStrategy.run(
> Exponenti
> >>> alSleepStrategy.scala:82)
> >>>         at org.apache.samza.system.kafka.
> KafkaSystemConsumer.refreshBro
> >>> kers(KafkaSystemConsumer.scala:191)
> >>>         at org.apache.samza.system.kafka.
> KafkaSystemConsumer$$anon$1.ab
> >>> dicate(KafkaSystemConsumer.scala:293)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy.abdicate(BrokerPro
> >>> xy.scala:207)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy$$anonfun$handleErr
> >>> ors$2.apply(BrokerProxy.scala:245)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy$$anonfun$handleErr
> >>> ors$2.apply(BrokerProxy.scala:245)
> >>>         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy.handleErrors(Broke
> >>> rProxy.scala:245)
> >>>         at org.apache.samza.system.kafka.BrokerProxy.org
> >>> <http://org.apache.samza.system.kafka.brokerproxy.org/>$apache$samz
> >>> a$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:186)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy$$anon$1$$anonfun$r
> >>> un$1.apply(BrokerProxy.scala:147)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy$$anon$1$$anonfun$r
> >>> un$1.apply(BrokerProxy.scala:134)
> >>>         at org.apache.samza.util.ExponentialSleepStrategy.run(
> Exponenti
> >>> alSleepStrategy.scala:82)
> >>>         at org.apache.samza.system.kafka.
> BrokerProxy$$anon$1.run(Broker
> >>> Proxy.scala:133)
> >>>         at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> WARN [2017-05-01 15:19:27,378] [U:1,904,F:1,153,T:3,056,M:3,056]
> >>> system.kafka.BrokerProxy:[Logging$class:warn:74] -
> >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> >>> client samza_consumer-job4-1] - It appears that we received an invalid
> or
> >>> empty offset Some(45039137) for [Topic3,17]. Attempting to use Kafka's
> >>> auto.offset.reset setting. This can result in data loss if processing
> >>> continues.
> >>> INFO [2017-05-01 15:19:27,398] [U:1,904,F:1,152,T:3,056,M:3,056]
> >>> system.kafka.KafkaSystemConsumer:[Logging$class:info:63] -
> >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform3:9092 for
> >>> client samza_consumer-job4-1] - Refreshing brokers for:
> Map([Topic3,17] ->
> >>> 45039137)
> >>> INFO [2017-05-01 15:19:27,405] [U:1,904,F:1,152,T:3,056,M:3,056]
> >>> system.kafka.GetOffset:[Logging$class:info:63] -
> >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> >>> client samza_consumer-job4-1] - Checking if auto.offset.reset is
> defined
> >>> for topic Topic3
> >>> INFO [2017-05-01 15:19:27,432] [U:1,904,F:1,152,T:3,056,M:3,056]
> >>> system.kafka.GetOffset:[Logging$class:info:63] -
> >>> [SAMZA-BROKER-PROXY-BrokerProxy thread pointed at platform1:9092 for
> >>> client samza_consumer-job4-1] - Got reset of type smallest.
> >>>
> >>> def addTopicPartition(tp: TopicAndPartition, nextOffset:
> Option[String]) = {
> >>>   debug("Adding new topic and partition %s to queue for %s" format
> (tp, host))
> >>>   if (nextOffsets.containsKey(tp)) {
> >>>     toss("Already consuming TopicPartition %s" format tp)
> >>>   }
> >>>
> >>>   val offset = if (nextOffset.isDefined && 
> >>> offsetGetter.isValidOffset(simpleConsumer,
> tp, nextOffset.get)) {
> >>>     nextOffset
> >>>       .get
> >>>       .toLong
> >>>   } else {
> >>>     warn("It appears that we received an invalid or empty offset %s
> for %s. Attempting to use Kafka's auto.offset.reset setting. This can
> result in data loss if processing continues." format (nextOffset, tp))
> >>>     offsetGetter.getResetOffset(simpleConsumer, tp)
> >>>   }
> >>>
> >>>   debug("Got offset %s for new topic and partition %s." format
> (offset, tp))
> >>>   nextOffsets += tp -> offset
> >>>   metrics.topicPartitions(host, port).set(nextOffsets.size)
> >>> }
> >>>
> >>> def isValidOffset(consumer: DefaultFetchSimpleConsumer,
> topicAndPartition: TopicAndPartition, offset: String) = {
> >>>   info("Validating offset %s for topic and partition %s" format
> (offset, topicAndPartition))
> >>>
> >>>   try {
> >>>     val messages = consumer.defaultFetch((topicAndPartition,
> offset.toLong))
> >>>
> >>>     if (messages.hasError) {
> >>>       
> >>> KafkaUtil.maybeThrowException(messages.errorCode(topicAndPartition.topic,
> topicAndPartition.partition))
> >>>     }
> >>>
> >>>     info("Able to successfully read from offset %s for topic and
> partition %s. Using it to instantiate consumer." format (offset,
> topicAndPartition))
> >>>
> >>>     true
> >>>   } catch {
> >>>     case e: OffsetOutOfRangeException => false
> >>>   }
> >>> }
> >>>
> >>>
> >>
> >
>



-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to