[ 
https://issues.apache.org/jira/browse/KAFKA-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14726448#comment-14726448
 ] 

Jay Kreps commented on KAFKA-2496:
----------------------------------

The first response is expected. The group management feature didn't exist in 
0.8.2.1.

For the second case that is a bug, the error message should say something like 
"unrecognized version for fetch request".

> New consumer from trunk doesn't work with 0.8.2.1 brokers
> ---------------------------------------------------------
>
>                 Key: KAFKA-2496
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2496
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.2.1
>            Reporter: Serhey Novachenko
>            Assignee: Ewen Cheslack-Postava
>
> I have a 0.8.2.1 broker running with a topic created and some messages in it.
> I also have a consumer built from trunk (commit 
> 9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise).
> When trying to consume messages from this topic the consumer fails with a 
> following stacktrace:
> {noformat}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected 
> error in join group response: The server experienced an unexpected error when 
> processing the request
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195)
>       at 
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
>       at Sandbox$.main(Sandbox.scala:38)
>       at Sandbox.main(Sandbox.scala)
> {noformat}
> What actually happens is broker being unable to handle the JoinGroup request 
> from consumer:
> {noformat}
> [2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request 
> Name: JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body: 
> {group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range}
>  (kafka.server.KafkaApis)
> kafka.common.KafkaException: Unknown api code 11
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The consumer code that leads to this is pretty much straightforward:
> {noformat}
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import scala.collection.JavaConverters._
> object Sandbox {
>   def main(args: Array[String]) {
>     val consumerProps = new Properties
>     consumerProps.put("bootstrap.servers", "localhost:9092")
>     consumerProps.put("group.id", "mirror_maker_group")
>     consumerProps.put("enable.auto.commit", "false")
>     consumerProps.put("session.timeout.ms", "30000")
>     consumerProps.put("key.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     consumerProps.put("value.deserializer", 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>     val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
>     consumer.subscribe(List("mirror").asJava)
>     val records = consumer.poll(1000)
>     for (record <- records.iterator().asScala) {
>       println(record.offset())
>     }
>   }
> }
> {noformat}
> I looked into the source code of the Kafka server in 0.8.2.1 branch and it 
> does not have the logic to handle JoinGroup request. It does not actually 
> have all the logic related to consumer coordination there so I wonder if 
> there is any way to make the new consumer work with 0.8.2.1 brokers?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to