Serhey Novachenko created KAFKA-2496:
----------------------------------------

             Summary: New consumer from trunk doesn't work with 0.8.2.1 brokers
                 Key: KAFKA-2496
                 URL: https://issues.apache.org/jira/browse/KAFKA-2496
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8.2.1
            Reporter: Serhey Novachenko


I have a 0.8.2.1 broker running with a topic created and some messages in it.
I also have a consumer built from trunk (commit 
9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise).

When trying to consume messages from this topic the consumer fails with a 
following stacktrace:

{noformat}
Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected 
error in join group response: The server experienced an unexpected error when 
processing the request
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195)
        at 
org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
        at Sandbox$.main(Sandbox.scala:38)
        at Sandbox.main(Sandbox.scala)
{noformat}

What actually happens is broker being unable to handle the JoinGroup request 
from consumer:

{noformat}
[2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request Name: 
JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body: 
{group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range}
 (kafka.server.KafkaApis)
kafka.common.KafkaException: Unknown api code 11
        at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

The consumer code that leads to this is pretty much straightforward:

{noformat}
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._

object Sandbox {
  def main(args: Array[String]) {
    val consumerProps = new Properties
    consumerProps.put("bootstrap.servers", "localhost:9092")
    consumerProps.put("group.id", "mirror_maker_group")
    consumerProps.put("enable.auto.commit", "false")
    consumerProps.put("session.timeout.ms", "30000")
    consumerProps.put("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    consumerProps.put("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
    val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
    consumer.subscribe(List("mirror").asJava)

    val records = consumer.poll(1000)
    for (record <- records.iterator().asScala) {
      println(record.offset())
    }
  }
}
{noformat}

I looked into the source code of the Kafka server in 0.8.2.1 branch and it does 
not have the logic to handle JoinGroup request. It does not actually have all 
the logic related to consumer coordination there so I wonder if there is any 
way to make the new consumer work with 0.8.2.1 brokers?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to