Hi Ismael, I looked again at the problem where I get ACK error 2 (InvalidMessage). When the error occurs, I see the error message with stack trace shown below. For the 8-partition topic "shown_news_stories" which I am sending messages to, only partition 7 has its lead replica on the broker running Kafka 0.9.0.0. For each of partitions 0-6, the lead replica is on one of the other two brokers (which run Kafka 0.8.2.1).
Interestingly, none of the messages currently going to the topic use message compaction (i.e. they all have empty keys), although at some time in the past I may have sent a few messages with keys. Message compaction is being used for other topics. So, the 0.9.0.0 version of the broker seems to think the topic is compacted while the 0.8.2.1 broker apparently doesn't think so. Does this shed any light on things? Also I notice the error message says "Compacted topic", which suggests that compaction is a property of the topic, and not individual messages as determined by key or lack thereof. I thought it was ok to send messages both with and without a key to the same topic, thus having compaction enabled for only a subset of the messages. Is this incorrect? Thanks, Dave [2016-01-20 19:21:44,923] ERROR [Replica Manager on Broker 172341926]: Error processing append operation on partition [shown_news_stories,7] (kafka.server.ReplicaManager) kafka.message.InvalidMessageException: Compacted topic cannot accept message without key. at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:250) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442) at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366) at kafka.server.KafkaApis.handle(KafkaApis.scala:68) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) On Tue, Jan 19, 2016 at 2:50 AM, Ismael Juma <ism...@juma.me.uk> wrote: > Hi Dave, > > Do you get any errors logged in the broker when you get ACK error 2 > (InvalidMessage) while producing requests to a mixed version cluster? It > would be helpful to see them. > > With regards to the kafka-console-producer.sh error, did you use the > 0.9.0.0 console producer with a mixed version cluster (ie some brokers were > on 0.8.2.1 while others were on 0.9.0.0)? If so, it is expected that it > won't work correctly. All the brokers should be upgraded before the clients > are upgraded (otherwise the 0.8.2.1 broker will send a response that the > newer clients cannot handle). > > Ismael > > On Fri, Jan 15, 2016 at 7:52 PM, Dave Peterson <d...@academia.edu> wrote: > > > Hi Ismael, > > > > I'm using bruce (https://github.com/ifwe/bruce) to send the produce > > requests, with a RequiredAcks value of 1. Everything works fine when > > all brokers are running 0.8.2.1. Also if I set up a new 0.9.0.0 > > cluster from scratch rather than trying to upgrade, everything works > > fine. The problem only occurs after upgrading one broker in the > > 3-broker cluster. > > > > The topic I am sending to has 8 partitions numbered 0-7. Doing > > further experimentation I see that the ACK error 2 occurs only when > > I send to partition 7. No problems occur when sending to partitions > > 0-6. If it helps I can send output from "kafka-topics.sh --describe" > > as well as tcpdump output showing the produce requests and responses. > > > > For comparison I tried using the 0.9.0.0 version of > > kafka-console-producer.sh to send messages. With the default > > RequiredAcks value of 0, it worked although I don't know which > > partition it sent to. With a RequiredAcks value of 1 I get the > > output shown below. > > > > Thanks, > > Dave > > > > > > > > [2016-01-15 10:33:28,843] ERROR Uncaught error in kafka producer I/O > > thread: (org.apache.kafka.clients.producer.internals.Sender) > > org.apache.kafka.common.protocol.types.SchemaException: Error reading > field > > 'throttle_time_ms': java.nio.BufferUnderflowException > > at > > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) > > at > > > > > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:464) > > at > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279) > > at > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > > at > > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > > at java.lang.Thread.run(Thread.java:745) > > > > > > On Fri, Jan 15, 2016 at 1:06 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > Hi Dave, > > > > > > On Fri, Jan 15, 2016 at 2:04 AM, Dave Peterson <d...@academia.edu> > > wrote: > > > > > > > I was trying to upgrade an 0.8.2.1 broker cluster to 0.9.0.0 by > > following > > > > the instructions here: > > > > > > > > http://kafka.apache.org/documentation.html#upgrade > > > > > > > > After upgrading one broker, with > inter.broker.protocol.version=0.8.2.X > > > > set, I get ACK error 2 (InvalidMessage) when I try to send produce > > > > requests. > > > > > > > > > I haven't seen other reports of this issue yet. Also, we have a system > > test > > > that covers this scenario: > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/upgrade_test.py > > > > > > Just to double-check, what is the version of the producer that you are > > > using to send produce requests to the 0.9.0.0 broker when you get the > > > error? > > > > > > Ismael > > > > > >