Closing socket connection to /127.0.0.1. (kafka.network.Processor)
Hi All, I am using Kafka 2.10-0.8.1 and I am seeing issues while consuming messages using Simple Consumer API. As new messages are produced, those are not retrieved by Simple consumer API and kafka server console shows the following- [2017-01-31 10:19:47,007] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) But same messages are displayed using kafka-console-consumer.sh. Any idea what's going on here?
Re: kafka_2.10-0.8.1 simple consumer retrieves junk data in the message
We use following method to deserialize the message consumed using Simple Consumer - DatumReader datumReader = new SpecificDatumReader<>(className); ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray); Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); T object = datumReader.read(null, decoder); IOUtils.closeQuietly(inputStream); It does not seem to handle header bytes. When I remove those 26 bytes, deserialization work fine. Please note, we are using Simple consumer API, not high level consumer. On Mon, Jan 30, 2017 at 10:57 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > What are the 26 additional bytes? That sounds like a header that a > decoder/deserializer is handling with the high level consumer. What class > are you using to deserialize the messages with the high level consumer? > > -Ewen > > On Fri, Jan 27, 2017 at 10:19 AM, Anjani Gupta < > anjani.gu...@salesforce.com> > wrote: > > > I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple > > Consumer API. I notice that byte array for message retrieved has 26 junk > > bytes appended at the beginning of original message sent by producer. > Any > > idea what's going on here? This works fine with High level consumer. > > > > This is how my code looks like - > > > > TopicAndPartition topicAndPartition = new TopicAndPartition(topic, > > partition); > > OffsetFetchResponse offsetFetchResponse = > consumer.fetchOffsets(new > > OffsetFetchRequest(GROUP_ID, > > Collections.singletonList(topicAndPartition), (short) 0, > > 0, > > CLIENT_ID)); > > > > //Fetch messages from Kafka. > > FetchRequest req = new FetchRequestBuilder() > > .clientId(CLIENT_ID) > > .addFetch(topic, partition, readOffset, 1000) > > .build(); > > FetchResponse fetchResponse = consumer.fetch(req); > > for (MessageAndOffset messageAndOffset : > > fetchResponse.messageSet(topicName, partition)) { > > byte[] message = messageAndOffset.message(). > payload().array(); > > > > } > > > > Here message has additional 26 bytes appended to beginning of array. > > > > > > Thanks, > > Anjani > > >
kafka_2.10-0.8.1 simple consumer retrieves junk data in the message
I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple Consumer API. I notice that byte array for message retrieved has 26 junk bytes appended at the beginning of original message sent by producer. Any idea what's going on here? This works fine with High level consumer. This is how my code looks like - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new OffsetFetchRequest(GROUP_ID, Collections.singletonList(topicAndPartition), (short) 0, 0, CLIENT_ID)); //Fetch messages from Kafka. FetchRequest req = new FetchRequestBuilder() .clientId(CLIENT_ID) .addFetch(topic, partition, readOffset, 1000) .build(); FetchResponse fetchResponse = consumer.fetch(req); for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topicName, partition)) { byte[] message = messageAndOffset.message().payload().array(); } Here message has additional 26 bytes appended to beginning of array. Thanks, Anjani
Fwd: How to disable auto commit for SimpleConsumer kafka 0.8.1
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1 version.For High level consumer, config options can be set and passed via consumerConfig as follows kafka.consumer.Consumer. createJavaConsumerConnector(this.consumerConfig); How can I achieve the same for SimpleConsumer? I mainly want to disable auto commit. I tried setting auto commit to false in consumer.properties and restarted kafka server, zookeeper and producer. But, that does not work. I think I need to apply this setting through code, not in consumer.properties. Can anyone help here? Here is how my code looks like List topicAndPartitionList = new ArrayList<>(); topicAndPartitionList.add(topicAndPartition); OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, correlationId,clientName)); Mapoffsets = offsetFetchResponse.offsets(); FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, offsets.get(topicAndPartition).offset(), 10) .build(); long readOffset = offsets.get(topicAndPartition).offset(); FetchResponse fetchResponse = consumer.fetch(req); //Consume messages from fetchResponse Map requestInfo = new HashMap<> (); requestInfo.put(topicAndPartition, new OffsetMetadataAndError(readOffset, "metadata", (short)0)); OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new OffsetCommitRequest("testGroup", requestInfo, (short)0, correlationId, clientName)); If above code crashes before committing offset, I still get latest offset as result of offsets.get(topicAndPartition).offset() in next run which makes me to think that auto commit of offset happens as code is executed.
How to disable auto commit for SimpleConsumer kafka 0.8.1
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1 version.For High level consumer, config options can be set and passed via consumerConfig as follows kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig); How can I achieve the same for SimpleConsumer? I mainly want to disable auto commit. I tried setting auto commit to false in consumer.properties and restarted kafka server, zookeeper and producer. But, that does not work. I think I need to apply this setting through code, not in consumer.properties. Can anyone help here? Here is how my code looks like List topicAndPartitionList = new ArrayList<>(); topicAndPartitionList.add(topicAndPartition); OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, correlationId,clientName)); Mapoffsets = offsetFetchResponse.offsets(); FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, offsets.get(topicAndPartition).offset(), 10) .build(); long readOffset = offsets.get(topicAndPartition).offset(); FetchResponse fetchResponse = consumer.fetch(req); //Consume messages from fetchResponse Map requestInfo = new HashMap<> (); requestInfo.put(topicAndPartition, new OffsetMetadataAndError(readOffset, "metadata", (short)0)); OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new OffsetCommitRequest("testGroup", requestInfo, (short)0, correlationId, clientName)); If above code crashes before committing offset, I still get latest offset as result of offsets.get(topicAndPartition).offset() in next run which makes me to think that auto commit of offset happens as code is executed.