Closing socket connection to /127.0.0.1. (kafka.network.Processor)

2017-01-31 Thread Anjani Gupta
Hi All,
I am using Kafka 2.10-0.8.1 and I am seeing issues while consuming messages
using Simple Consumer API.

As new messages are produced, those are not retrieved by Simple consumer
API and kafka server console shows the following-

[2017-01-31 10:19:47,007] INFO Closing socket connection to /127.0.0.1.
(kafka.network.Processor)

But same messages are displayed using kafka-console-consumer.sh. Any idea
what's going on here?


Re: kafka_2.10-0.8.1 simple consumer retrieves junk data in the message

2017-01-31 Thread Anjani Gupta
We use following method to deserialize the message consumed using Simple
Consumer -

DatumReader datumReader = new SpecificDatumReader<>(className);
ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray);
Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
T object = datumReader.read(null, decoder);
IOUtils.closeQuietly(inputStream);


It does not seem to handle header bytes. When I remove those 26 bytes,

deserialization work fine. Please note, we are using Simple consumer
API, not high level consumer.


On Mon, Jan 30, 2017 at 10:57 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> What are the 26 additional bytes? That sounds like a header that a
> decoder/deserializer is handling with the high level consumer. What class
> are you using to deserialize the messages with the high level consumer?
>
> -Ewen
>
> On Fri, Jan 27, 2017 at 10:19 AM, Anjani Gupta <
> anjani.gu...@salesforce.com>
> wrote:
>
> > I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple
> > Consumer API. I notice that byte array for message retrieved has 26 junk
> > bytes appended at the beginning  of original message sent by producer.
> Any
> > idea what's going on here? This works fine with High level consumer.
> >
> > This is how my code looks like -
> >
> > TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> > partition);
> > OffsetFetchResponse offsetFetchResponse =
> consumer.fetchOffsets(new
> > OffsetFetchRequest(GROUP_ID,
> > Collections.singletonList(topicAndPartition), (short) 0,
> > 0,
> > CLIENT_ID));
> >
> > //Fetch messages from Kafka.
> > FetchRequest req = new FetchRequestBuilder()
> > .clientId(CLIENT_ID)
> > .addFetch(topic, partition, readOffset, 1000)
> > .build();
> > FetchResponse fetchResponse = consumer.fetch(req);
> > for (MessageAndOffset messageAndOffset :
> > fetchResponse.messageSet(topicName, partition)) {
> > byte[] message = messageAndOffset.message().
> payload().array();
> >
> > }
> >
> > Here message has additional 26 bytes appended to beginning of array.
> >
> >
> > Thanks,
> > Anjani
> >
>


kafka_2.10-0.8.1 simple consumer retrieves junk data in the message

2017-01-27 Thread Anjani Gupta
I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple
Consumer API. I notice that byte array for message retrieved has 26 junk
bytes appended at the beginning  of original message sent by producer. Any
idea what's going on here? This works fine with High level consumer.

This is how my code looks like -

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
OffsetFetchRequest(GROUP_ID,
Collections.singletonList(topicAndPartition), (short) 0, 0,
CLIENT_ID));

//Fetch messages from Kafka.
FetchRequest req = new FetchRequestBuilder()
.clientId(CLIENT_ID)
.addFetch(topic, partition, readOffset, 1000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
for (MessageAndOffset messageAndOffset :
fetchResponse.messageSet(topicName, partition)) {
byte[] message = messageAndOffset.message().payload().array();

}

Here message has additional 26 bytes appended to beginning of array.


Thanks,
Anjani


Fwd: How to disable auto commit for SimpleConsumer kafka 0.8.1

2016-12-06 Thread Anjani Gupta
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows kafka.consumer.Consumer.
createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,clientName));


Map offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
   .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 10)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.


How to disable auto commit for SimpleConsumer kafka 0.8.1

2016-12-06 Thread Anjani Gupta
I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1
version.For High level consumer, config options can be set and passed via
consumerConfig as follows
kafka.consumer.Consumer.createJavaConsumerConnector(this.consumerConfig);

How can I achieve the same for SimpleConsumer? I mainly want to disable
auto commit. I tried setting auto commit to false in consumer.properties
and restarted kafka server, zookeeper and producer. But, that does not
work. I think I need to apply this setting through code, not in
consumer.properties. Can anyone help here?

Here is how my code looks like

List topicAndPartitionList = new ArrayList<>();
topicAndPartitionList.add(topicAndPartition);
OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new
 OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0,
correlationId,clientName));


Map offsets =
offsetFetchResponse.offsets();
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
   .addFetch(a_topic, a_partition,
offsets.get(topicAndPartition).offset(), 10)   .build();

long readOffset = offsets.get(topicAndPartition).offset();
FetchResponse fetchResponse = consumer.fetch(req);

//Consume messages from fetchResponse


Map requestInfo = new
HashMap<>  ();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(readOffset, "metadata", (short)0));
OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new
OffsetCommitRequest("testGroup", requestInfo, (short)0,
correlationId, clientName));


If above code crashes before committing offset, I still get latest offset
as result of offsets.get(topicAndPartition).offset() in next run which
makes me to think that auto commit of offset happens as code is executed.