Hi Kafka users,
I have an issue with saving Kafka offsets to Zookeeper through
OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
kindly borrow the description:
http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper

"I've installed Zookeeper and Kafka from Ambari, on CentoS 7.

Ambari version: 2.1.2.1
Zookeeper version: 3.4.6.2.3
Kafka version: 0.8.2.2.3
Java Kafka client:kafka_2.10, 0.8.2.2

I'm trying to save the Kafka offset, using the following code:

SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
OffsetAndMetadata> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
"", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
new OffsetCommitRequest(groupName, requestInfo, correlationId,
clientName, (short)0);
simpleConsumer.commitOffsets(offsetCommitRequest);
simpleConsumer.close();

But when I run this, I get the following error in my client:

java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.

Also in the Kafka logs I have the following error:

[2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
    at 
kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
    at 
kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at 
kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
    at 
kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
    at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
    at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
    at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
    at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
    at kafka.network.Processor.read(SocketServer.scala:547)
    at kafka.network.Processor.run(SocketServer.scala:405)
    at java.lang.Thread.run(Thread.java:745)

Now I've also downloaded and installed the official Kafka 0.8.2.2 version
from
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
and
it works ok; you can save the Kafka offset without any error.

Can anybody give me a some directions, why is the Ambari Kafka failing to
save the offset?

P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
offset is actually saved in Zookeeper.
"
My only difference (IMHO, irrelevant)  is that I'm using HDP in version
2.3.2, but other than that versions are the same.

Do you guys have any hints on what could be wrong? Is that something wrong
with my use of offset committing? Or conflict of versions?
Any hints would be highly appreciated :)
Cheers,
Krzysztof

Reply via email to