Hi,

I think we are seeing a similar error too where Samza is getting stuck on
producing to Kafka. This was on our sandbox environment and not production,
we had all Kafka instances (3) go away briefly then re-appear with
different ip address -> broker mappings. I know this is not ideal and we're
fixing that separately, but I would have expected Samza/Producer to recover.

What happened instead is that Samza (via the Kafka Producer client) got
stuck in a loop outputting "org.apache.kafka.common.network.Selector: Error
in I/O with <HOST>"

So I think there might be a bug in the Kafka client where it fails to
discover these new hosts? But Samza also does not seem to fail that quickly
when it is stuck. We had to manually restart the Samza task for this to be
fixed.

I've put a full stack trace
https://gist.github.com/danharvey/3909d731f130a0fe1aad

We also have a non-Samza Kafka consumer and that coped fine with the
restart. We are using Kafka 0.8.2.1 and Samza 0.9.0.

Thanks,
Dan



On Mon, 4 May 2015 at 14:37 Garry Turkington <
g.turking...@improvedigital.com> wrote:

> Hi Guozhang,
>
> Honestly snappy is so hugely more performant for me that I have rarely
> used anything but it. So I've only seen the problem using snappy but since
> I use it all the time that may or may not be a valid data point.
>
> I did find some old logs from what happens client side when I see this
> happen.  First a series of communication errors along the lines of the
> following which I think were due to a broker bouncing or timing out:
>
> WARN Got error produce response with correlation id 331092 on
> topic-partition <topic-name>-16, retrying (4 attempts left). Error:
> NOT_LEADER_FOR_PARTITION 2015-04-15 13:54:13,890
> (org.apache.kafka.clients.producer.internals.Sender)
>
> But then the client dies with:
>
> java: target/snappy-1.1.1/snappy.cc:423: char*
> snappy::internal::CompressFragment(const char*, size_t, char*,
> snappy::uint16*, int): Assertion `0 == memcmp(base, candidate, matched)'
> failed.
>
> I'll try and get some better traces and post over on the kafka list. But
> it'll be after Strata this week.
>
> Cheers
> Garry
>
>
> -----Original Message-----
> From: Guozhang Wang [mailto:wangg...@gmail.com]
> Sent: 04 May 2015 00:38
> To: dev@samza.apache.org
> Subject: Re: Errors and hung job on broker shutdown
>
> Garry,
>
> Just wondering, does this error not exist with Gzip compression? Or you
> could see it with any compression schemes?
>
> Guozhang
>
> On Sun, May 3, 2015 at 2:32 AM, Garry Turkington <
> g.turking...@improvedigital.com> wrote:
>
> > Hi,
> >
> > Just to add another data point, I've been occasionally seeing the
> > first error with a non-Samza app using the new Kafka producer with
> > Snappy compression. I was going to post to the Kafka list but I
> > haven't really narrowed down the situations yet. It sort of looks like
> > it most often happens to me some minutes after a broker has restarted
> > or had its ZK session time out in periods of very heavy load. But I
> > need do more troubleshooting to have something less  vague to report
> over there.
> >
> > Garry
> >
> > -----Original Message-----
> > From: Guozhang Wang [mailto:wangg...@gmail.com]
> > Sent: 01 May 2015 23:57
> > To: dev@samza.apache.org
> > Subject: Re: Errors and hung job on broker shutdown
> >
> > Hmm, it seems your snappy compressed data is corrupted and hence keep
> > getting rejected by the broker, hence keeping the producer blocked on
> > close(). Not sure how this happens as I have not seen this error ever
> > before (myself wrote the new Kafka producer's compression module, and
> > have ran it with various kinds of unit / integration test cases, but
> > did not see this coming)..
> >
> > Guozhang
> >
> > On Wed, Apr 29, 2015 at 11:37 PM, Roger Hoover
> > <roger.hoo...@gmail.com>
> > wrote:
> >
> > > Guozhang and Yan,
> > >
> > > Thank you both for your responses.  I tried a lot of combinations
> > > and I think I've determined that it's new producer + snappy that
> > > causes the issue.
> > >
> > > It never happens with the old producer and it never happens with lz4
> > > or no compression.  It only happens when a broker gets restarted (or
> > > maybe just shutdown).
> > >
> > > The error is not always the same.  I've noticed at least three types
> > > of errors on the Kafka brokers.
> > >
> > > 1) java.io.IOException: failed to read chunk at
> > >
> > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j
> > > av
> > > a:356)
> > > http://pastebin.com/NZrrEHxU
> > > 2) java.lang.OutOfMemoryError: Java heap space
> > >    at
> > >
> > > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.j
> > > av
> > > a:346)
> > > http://pastebin.com/yuxk1BjY
> > > 3) java.io.IOException: PARSING_ERROR(2)
> > >   at
> > > org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
> > > http://pastebin.com/yq98Hx49
> > >
> > > I've noticed a couple different behaviors from the Samza
> > > producer/job
> > > A) It goes into a long retry loop where this message is logged.  I
> > > saw this with error #1 above.
> > >
> > > 2015-04-29 18:17:31 Sender [WARN] task[Partition 7]
> > > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,7]
> > > offset[9999253] Got error produce response with correlation id 4878
> > > on topic-partition svc.call.w_deploy.T2UDe2PWRYWcVAAAhMOAwA-1,
> > > retrying (2147483646 attempts left). Error: CORRUPT_MESSAGE
> > >
> > > B) The job exists with
> > > org.apache.kafka.common.errors.UnknownServerException (at least when
> > > run as ThreadJob).  I saw this with error #3 above.
> > >
> > > org.apache.samza.SamzaException: Unable to send message from
> > > TaskName-Partition 6 to system kafka.
> > > org.apache.kafka.common.errors.UnknownServerException: The server
> > > experienced an unexpected error when processing the request
> > >
> > > This seems most likely to be a bug in the new Kafka producer.  I'll
> > > probably file a JIRA for that project.
> > >
> > > Thanks,
> > >
> > > Roger
> > >
> > > On Wed, Apr 29, 2015 at 7:38 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > And just to answer your first question: SIGTERM with
> > > > controlled.shutdown=true should be OK for bouncing the broker.
> > > >
> > > > Guozhang
> > > >
> > > > On Wed, Apr 29, 2015 at 7:36 PM, Guozhang Wang
> > > > <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Roger,
> > > > >
> > > > > I believe Samza 0.9.0 already uses the Java producer.
> > > > >
> > > > > Java producer's close() call will try to flush all buffered data
> > > > > to the brokers before completing the call. However, if some
> > > > > buffered data's destination partition leader is not known, the
> > > > > producer will block on refreshing the metadata and then retry
> > sending.
> > > > >
> > > > > From the broker logs, it seems it does receive the producer
> > > > > request but failed to handle it due to "Leader not local" after
> > > > > the
> > bounce:
> > > > >
> > > > > --------
> > > > > [2015-04-28 14:26:44,729] WARN [KafkaApi-0] Produce request with
> > > > > correlation id 226 from client
> > > > > samza_producer-svc_call_w_deploy_to_json-1-1430244278081-3 on
> > > > > partition [sys.samza_metrics,0] failed due to Leader not local
> > > > > for partition [sys.samza_metrics,0] on broker 0
> > > > > (kafka.server.KafkaApis)
> > > > > [2015-04-28 14:26:47,426] WARN [KafkaApi-0] Produce request with
> > > > > correlation id 45671 from client
> > > > > samza_checkpoint_manager-svc_call_join_deploy-1-1429911482243-4
> > > > > on partition
> > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0]
> > > failed
> > > > > due to Leader not local for partition
> > > > > [__samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0] on
> > > > > broker
> > > > > 0
> > > > > (kafka.server.KafkaApis)
> > > > > [2015-04-28 14:27:24,578] WARN [KafkaApi-0] Produce request with
> > > > > correlation id 12267 from client
> > > > > samza_producer-svc_call_join_deploy-1-1429911471254-0 on
> > > > > partition [sys.samza_metrics,0] failed due to Leader not local
> > > > > for partition [sys.samza_metrics,0] on broker 0
> > > > > (kafka.server.KafkaApis)
> > > > > --------
> > > > >
> > > > > because for these two topic-partitions (sys.samza_metrics,0 and
> > > > > __samza_checkpoint_ver_1_for_svc-call-join-deploy_1,0), their
> > > > > lead has
> > > > been
> > > > > moved to broker id:1,host:sit320w80m7,port:9092. When the
> > > > > producer gets
> > > > the
> > > > > error code from the old leader, it should refresh its metadata
> > > > > and get
> > > > the
> > > > > new leader as broker-1, and retry sending, but for some reason
> > > > > it does
> > > > not
> > > > > refresh its metadata. Without producer logs from Samza container
> > > > > I
> > > cannot
> > > > > further investigate the issue.
> > > > >
> > > > > Which Kafka version does Samza 0.9.0 use?
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, Apr 29, 2015 at 4:30 PM, Yan Fang <yanfang...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Not sure about the Kafka side. From the Samza side, from your
> > > > >> description ( "does not exit nor does it make any progress" ),
> > > > >> I think the code is stuck
> > > in
> > > > >> producer.close
> > > > >> <
> > > > >>
> > > >
> > > https://github.com/apache/samza/blob/master/samza-kafka/src/main/sca
> > > la
> > > /org/apache/samza/system/kafka/KafkaSystemProducer.scala#L143
> > > > >> >,
> > > > >> otherwise, it will throw SamzaException to quit the job. So
> > > > >> maybe some Kafka experts in this mailing list or Kafka mailing
> > > > >> list can help
> > > > >>
> > > > >> Fang, Yan
> > > > >> yanfang...@gmail.com
> > > > >>
> > > > >> On Tue, Apr 28, 2015 at 5:35 PM, Roger Hoover
> > > > >> <roger.hoo...@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > At error level logging, this was the only entry in the Samza
> log:
> > > > >> >
> > > > >> > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR]
> > > > >> > task[Partition 2]
> > > > >> > ssp[kafka,svc.call.w_deploy.c7tH4YaiTQyBEwAAhQzRXw,2]
> > > offset[9129395]
> > > > >> > Unable to send message from TaskName-Partition 1 to system
> > > > >> > kafka
> > > > >> >
> > > > >> > Here is the log from the Kafka broker that was shutdown.
> > > > >> >
> > > > >> > http://pastebin.com/afgmLyNF
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Roger
> > > > >> >
> > > > >> >
> > > > >> > On Tue, Apr 28, 2015 at 3:49 PM, Yi Pan <nickpa...@gmail.com>
> > > wrote:
> > > > >> >
> > > > >> > > Roger, could you paste the full log from Samza container?
> > > > >> > > If you
> > > can
> > > > >> > figure
> > > > >> > > out which Kafka broker the message was sent to, it would be
> > > helpful
> > > > >> if we
> > > > >> > > get the log from the broker as well.
> > > > >> > >
> > > > >> > > On Tue, Apr 28, 2015 at 3:31 PM, Roger Hoover <
> > > > roger.hoo...@gmail.com
> > > > >> >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi,
> > > > >> > > >
> > > > >> > > > I need some help figuring out what's going on.
> > > > >> > > >
> > > > >> > > > I'm running Kafka 0.8.2.1 and Samza 0.9.0 on YARN.  All
> > > > >> > > > the
> > > topics
> > > > >> have
> > > > >> > > > replication factor of 2.
> > > > >> > > >
> > > > >> > > > I'm bouncing the Kafka broker using SIGTERM (with
> > > > >> > > > controlled.shutdown.enable=true).  I see the Samza job
> > > > >> > > > log this
> > > > >> message
> > > > >> > > and
> > > > >> > > > then hang (does not exit nor does it make any progress).
> > > > >> > > >
> > > > >> > > > 2015-04-28 14:28:25 KafkaSystemProducer [ERROR]
> > > > >> > > > task[Partition
> > > 2]
> > > > >> > > > ssp[kafka,my-topic,2] offset[9129395] Unable to send
> > > > >> > > > message
> > > from
> > > > >> > > > TaskName-Partition 1 to system kafka
> > > > >> > > >
> > > > >> > > > The Kafka consumer (Druid Real-Time node) on the other
> > > > >> > > > side then
> > > > >> barfs
> > > > >> > on
> > > > >> > > > the message:
> > > > >> > > >
> > > > >> > > > Exception in thread "chief-svc-perf"
> > > > >> > > kafka.message.InvalidMessageException:
> > > > >> > > > Message is corrupt (stored crc = 1792882425, computed crc
> > > > >> > > > =
> > > > >> 3898271689)
> > > > >> > > > at kafka.message.Message.ensureValid(Message.scala:166)
> > > > >> > > > at
> > > > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala
> > > > >> :1
> > > > >> 01)
> > > > >> > > > at
> > > > >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala
> > > > >> :3
> > > > >> 3)
> > > > >> > > > at
> > > > >> > >
> > > > >>
> > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala
> > > :6
> > > 6)
> > > > >> > > > at
> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
> > > > >> > > > at
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > > io.druid.firehose.kafka.KafkaEightFirehoseFactory$1.hasMore(KafkaEig
> > > ht
> > > FirehoseFactory.java:106)
> > > > >> > > > at
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > > io.druid.segment.realtime.RealtimeManager$FireChief.run(RealtimeMana
> > > ge
> > > r.java:234)
> > > > >> > > >
> > > > >> > > > My questions are:
> > > > >> > > > 1) What is the right way to bounce a Kafka broker?
> > > > >> > > > 2) Is this a bug in Samza that the job hangs after
> > > > >> > > > producer
> > > > request
> > > > >> > > fails?
> > > > >> > > > Has anyone seen this?
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > >
> > > > >> > > > Roger
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to