Hi Ismael,

I looked again at the problem where I get ACK error 2 (InvalidMessage).
When the error occurs, I see the error message with stack trace shown below.
For the 8-partition topic "shown_news_stories" which I am sending messages
to, only partition 7 has its lead replica on the broker running Kafka
0.9.0.0.
For each of partitions 0-6, the lead replica is on one of the other two
brokers
(which run Kafka 0.8.2.1).

Interestingly, none of the messages currently going to the topic use message
compaction (i.e. they all have empty keys), although at some time in the
past
I may have sent a few messages with keys.  Message compaction is being
used for other topics.  So, the 0.9.0.0 version of the broker seems to
think the
topic is compacted while the 0.8.2.1 broker apparently doesn't think so.
Does
this shed any light on things?

Also I notice the error message says "Compacted topic", which suggests that
compaction is a property of the topic, and not individual messages as
determined by key or lack thereof.  I thought it was ok to send messages
both
with and without a key to the same topic, thus having compaction enabled for
only a subset of the messages.  Is this incorrect?

Thanks,
Dave


[2016-01-20 19:21:44,923] ERROR [Replica Manager on Broker 172341926]:
Error processing append operation on partition [shown_news_stories,7]
(kafka.server.ReplicaManager)
kafka.message.InvalidMessageException: Compacted topic cannot accept
message without key.
        at
kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250)
        at kafka.log.Log.liftedTree1$1(Log.scala:327)
        at kafka.log.Log.append(Log.scala:326)
        at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442)
        at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
        at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428)
        at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401)
        at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386)
        at
kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:68)
        at
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)



On Tue, Jan 19, 2016 at 2:50 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Dave,
>
> Do you get any errors logged in the broker when you get ACK error 2
> (InvalidMessage) while producing requests to a mixed version cluster? It
> would be helpful to see them.
>
> With regards to the kafka-console-producer.sh error, did you use the
> 0.9.0.0 console producer with a mixed version cluster (ie some brokers were
> on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it
> won't work correctly. All the brokers should be upgraded before the clients
> are upgraded (otherwise the 0.8.2.1 broker will send a response that the
> newer clients cannot handle).
>
> Ismael
>
> On Fri, Jan 15, 2016 at 7:52 PM, Dave Peterson <d...@academia.edu> wrote:
>
> > Hi Ismael,
> >
> > I'm using bruce (https://github.com/ifwe/bruce) to send the produce
> > requests, with a RequiredAcks value of 1.  Everything works fine when
> > all brokers are running 0.8.2.1.  Also if I set up a new 0.9.0.0
> > cluster from scratch rather than trying to upgrade, everything works
> > fine.  The problem only occurs after upgrading one broker in the
> > 3-broker cluster.
> >
> > The topic I am sending to has 8 partitions numbered 0-7.  Doing
> > further experimentation I see that the ACK error 2 occurs only when
> > I send to partition 7.  No problems occur when sending to partitions
> > 0-6.  If it helps I can send output from "kafka-topics.sh --describe"
> > as well as tcpdump output showing the produce requests and responses.
> >
> > For comparison I tried using the 0.9.0.0 version of
> > kafka-console-producer.sh to send messages.  With the default
> > RequiredAcks value of 0, it worked although I don't know which
> > partition it sent to.  With a RequiredAcks value of 1 I get the
> > output shown below.
> >
> > Thanks,
> > Dave
> >
> >
> >
> > [2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O
> > thread:  (org.apache.kafka.clients.producer.internals.Sender)
> > org.apache.kafka.common.protocol.types.SchemaException: Error reading
> field
> > 'throttle_time_ms': java.nio.BufferUnderflowException
> >         at
> > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
> >         at
> >
> >
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464)
> >         at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
> >         at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> >         at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> >         at java.lang.Thread.run(Thread.java:745)
> >
> >
> > On Fri, Jan 15, 2016 at 1:06 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Hi Dave,
> > >
> > > On Fri, Jan 15, 2016 at 2:04 AM, Dave Peterson <d...@academia.edu>
> > wrote:
> > >
> > > > I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by
> > following
> > > > the instructions here:
> > > >
> > > >     http://kafka.apache.org/documentation.html#upgrade
> > > >
> > > > After upgrading one broker, with
> inter.broker.protocol.version=0.8.2.X
> > > > set, I get ACK error 2 (InvalidMessage) when I try to send produce
> > > > requests.
> > >
> > >
> > > I haven't seen other reports of this issue yet. Also, we have a system
> > test
> > > that covers this scenario:
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py
> > >
> > > Just to double-check, what is the version of the producer that you are
> > > using to send produce requests to the 0.9.0.0 broker when you get the
> > > error?
> > >
> > > Ismael
> > >
> >
>

Reply via email to