[
https://issues.apache.org/jira/browse/KAFKA-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725398#comment-14725398
]
Serhey Novachenko commented on KAFKA-2496:
------------------------------------------
I also tried assigning partitions instead of subscribing:
{noformat}
consumer.assign(List(new TopicPartition("mirror", 0)).asJava)
consumer.seek(new TopicPartition("mirror", 0), 0)
{noformat}
It looks like Kafka server does not fail now, but I get another exception while
trying to poll the consumer:
{noformat}
Exception in thread "main"
org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'responses': Error reading field 'topic': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:379)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:229)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:780)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
at Sandbox$.main(Sandbox.scala:42)
at Sandbox.main(Sandbox.scala)
{noformat}
> New consumer from trunk doesn't work with 0.8.2.1 brokers
> ---------------------------------------------------------
>
> Key: KAFKA-2496
> URL: https://issues.apache.org/jira/browse/KAFKA-2496
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8.2.1
> Reporter: Serhey Novachenko
>
> I have a 0.8.2.1 broker running with a topic created and some messages in it.
> I also have a consumer built from trunk (commit
> 9c936b186d390f59f1d4ad8cc2995f800036a3d6 to be precise).
> When trying to consume messages from this topic the consumer fails with a
> following stacktrace:
> {noformat}
> Exception in thread "main" org.apache.kafka.common.KafkaException: Unexpected
> error in join group response: The server experienced an unexpected error when
> processing the request
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:361)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$JoinGroupResponseHandler.handle(Coordinator.java:309)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:701)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator$CoordinatorResponseHandler.onSuccess(Coordinator.java:675)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:163)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:129)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:105)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:293)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:237)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:274)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:182)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:145)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator.reassignPartitions(Coordinator.java:195)
> at
> org.apache.kafka.clients.consumer.internals.Coordinator.ensurePartitionAssignment(Coordinator.java:170)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:770)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:731)
> at Sandbox$.main(Sandbox.scala:38)
> at Sandbox.main(Sandbox.scala)
> {noformat}
> What actually happens is broker being unable to handle the JoinGroup request
> from consumer:
> {noformat}
> [2015-09-01 11:48:38,820] ERROR [KafkaApi-0] error when handling request
> Name: JoinGroup; Version: 0; CorrelationId: 141; ClientId: consumer-1; Body:
> {group_id=mirror_maker_group,session_timeout=30000,topics=[mirror],consumer_id=,partition_assignment_strategy=range}
> (kafka.server.KafkaApis)
> kafka.common.KafkaException: Unknown api code 11
> at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The consumer code that leads to this is pretty much straightforward:
> {noformat}
> import org.apache.kafka.clients.consumer.KafkaConsumer
> import scala.collection.JavaConverters._
> object Sandbox {
> def main(args: Array[String]) {
> val consumerProps = new Properties
> consumerProps.put("bootstrap.servers", "localhost:9092")
> consumerProps.put("group.id", "mirror_maker_group")
> consumerProps.put("enable.auto.commit", "false")
> consumerProps.put("session.timeout.ms", "30000")
> consumerProps.put("key.deserializer",
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> consumerProps.put("value.deserializer",
> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
> val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
> consumer.subscribe(List("mirror").asJava)
> val records = consumer.poll(1000)
> for (record <- records.iterator().asScala) {
> println(record.offset())
> }
> }
> }
> {noformat}
> I looked into the source code of the Kafka server in 0.8.2.1 branch and it
> does not have the logic to handle JoinGroup request. It does not actually
> have all the logic related to consumer coordination there so I wonder if
> there is any way to make the new consumer work with 0.8.2.1 brokers?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)