[
https://issues.apache.org/jira/browse/KAFKA-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ewen Cheslack-Postava resolved KAFKA-2496.
------------------------------------------
Resolution: Invalid
Assignee: Ewen Cheslack-Postava
[~serejja] 2 points.
1. This isn't expected to work. The necessary broker functionality wasn't in
place in the 0.8.2 releases. https://issues.apache.org/jira/browse/KAFKA-1326
tracks the progress on the new consumer implementation, including broker-side
support; for example, it links to
https://issues.apache.org/jira/browse/KAFKA-1335, which is a critical piece of
the consumer coordinator implementation on the broker and has Fix Version
marked as 0.8.3 (checked into trunk, but not yet released).
2. The recommended upgrade path is to upgrade brokers first, and then clients.
This ensures that any features that require broker support (like the consumer
coordinator for the new consumer) can be rolled out smoothly. It also keeps
development much simpler -- only the broker needs to handle multiple
request/response formats.
Note that the new consumer protocol is still undergoing a few changes (e.g.,
https://issues.apache.org/jira/browse/KAFKA-2464) so even if the implementation
had been there, I'm not sure we'd want to support older versions as it would
complicate things significantly and none of that code was intended to be
supported in 0.8.2 releases.
> New consumer from trunk doesn't work with 0.8.2.1 brokers
> ---------------------------------------------------------
>
> Key: KAFKA-2496
> URL: https://issues.apache.org/jira/browse/KAFKA-2496
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.2.1
> Reporter: Serhey Novachenko
> Assignee: Ewen Cheslack-Postava
>
> I have a 0.8.2.1 broker running with a topic created and some messages in it.
> I also have a consumer built from trunk (commit
> 9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise).
> When trying to consume messages from this topic the consumer fails with a
> following stacktrace:
> {noformat}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected
> error in join group response: The server experienced an unexpected error when
> processing the request
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
> at Sandbox$.main(Sandbox.scala:38)
> at Sandbox.main(Sandbox.scala)
> {noformat}
> What actually happens is broker being unable to handle the JoinGroup request
> from consumer:
> {noformat}
> [2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request
> Name: JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body:
> {group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range}
> (kafka.server.KafkaApis)
> kafka.common.KafkaException: Unknown api code 11
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The consumer code that leads to this is pretty much straightforward:
> {noformat}
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import scala.collection.JavaConverters._
> object Sandbox {
> def main(args: Array[String]) {
> val consumerProps = new Properties
> consumerProps.put("bootstrap.servers", "localhost:9092")
> consumerProps.put("group.id", "mirror_maker_group")
> consumerProps.put("enable.auto.commit", "false")
> consumerProps.put("session.timeout.ms", "30000")
> consumerProps.put("key.deserializer",
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> consumerProps.put("value.deserializer",
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
> consumer.subscribe(List("mirror").asJava)
> val records = consumer.poll(1000)
> for (record <- records.iterator().asScala) {
> println(record.offset())
> }
> }
> }
> {noformat}
> I looked into the source code of the Kafka server in 0.8.2.1 branch and it
> does not have the logic to handle JoinGroup request. It does not actually
> have all the logic related to consumer coordination there so I wonder if
> there is any way to make the new consumer work with 0.8.2.1 brokers?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)