Hi Kafka users, I have an issue with saving Kafka offsets to Zookeeper through OffsetCommitRequest. It's the same issue I found unanswered on SO, so I kindly borrow the description: http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
"I've installed Zookeeper and Kafka from Ambari, on CentoS 7. Ambari version: 2.1.2.1 Zookeeper version: 3.4.6.2.3 Kafka version: 0.8.2.2.3 Java Kafka client:kafka_2.10, 0.8.2.2 I'm trying to save the Kafka offset, using the following code: SimpleConsumer simpleConsumer = new SimpleConsumer(host, port, soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition, OffsetAndMetadata> requestInfo = new HashMap<>(); requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset, "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest = new OffsetCommitRequest(groupName, requestInfo, correlationId, clientName, (short)0); simpleConsumer.commitOffsets(offsetCommitRequest); simpleConsumer.close(); But when I run this, I get the following error in my client: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. Also in the Kafka logs I have the following error: [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:498) at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406) at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73) at kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68) at kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65) at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55) at kafka.network.Processor.read(SocketServer.scala:547) at kafka.network.Processor.run(SocketServer.scala:405) at java.lang.Thread.run(Thread.java:745) Now I've also downloaded and installed the official Kafka 0.8.2.2 version from https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz and it works ok; you can save the Kafka offset without any error. Can anybody give me a some directions, why is the Ambari Kafka failing to save the offset? P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the offset is actually saved in Zookeeper. " My only difference (IMHO, irrelevant) is that I'm using HDP in version 2.3.2, but other than that versions are the same. Do you guys have any hints on what could be wrong? Is that something wrong with my use of offset committing? Or conflict of versions? Any hints would be highly appreciated :) Cheers, Krzysztof