Re: Java Consumer Not reading message -

2018-02-18 Thread Debraj Manna
Thanks Matthias for replying.

 The answer has been discussed the stackoverflow link which I have posted
in the question.

On 16-Feb-2018 11:35 PM, "Matthias J. Sax"  wrote:

Can you check the committed offsets using bin/kafka-consumer-group.sh ?

Also inspect your consumer's position via KafkaConsumer#position() to
see where the consumer actually is in the topic.


-Matthias


On 2/16/18 5:13 AM, Debraj Manna wrote:
> I have posted the same question in stackoverflow also. But I have not got
> any reply there also
>
> https://stackoverflow.com/questions/48826279/kafka-0-10-
java-consumer-not-reading-message-from-topic
>
> On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna 
> wrote:
>
>> I have a simple java producer like below
>>
>> public class Producer
>> {
>> private final static String TOPIC = "my-example-topi8";
>> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>> public static void main( String[] args ) throws Exception {
>> Producer producer = createProducer();
>> for(int i=0;i<3000;i++) {
>> String msg = "Test Message-" + i;
>> final ProducerRecord record = new
>> ProducerRecord(TOPIC, "key" + i, msg.getBytes());
>> producer.send(record).get();
>> System.out.println("Sent message " + msg);
>> }
>> producer.close();
>> }
>>
>> private static Producer createProducer() {
>> Properties props = new Properties();
>> props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
>> props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>> props.put("client.id", "AppFromJava");
>> props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>> props.put("key.serializer.class", "kafka.serializer.
>> StringEncoder");
>> props.put("key.serializer", "org.apache.kafka.common.
>> serialization.StringSerializer");
>> props.put("compression.codec", "snappy");
>> props.put("value.serializer", "org.apache.kafka.common.
>> serialization.ByteArraySerializer");
>> return new KafkaProducer(props);
>> }
>> }
>>
>> I am trying to read data as below
>>
>> public class Consumer
>> {
>> private final static String TOPIC = "my-example-topi8";
>> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>> public static void main( String[] args ) throws Exception {
>> Consumer consumer = createConsumer();
>> start(consumer);
>> }
>>
>> static void start(Consumer consumer) throws
>> InterruptedException {
>> final int giveUp = 10;
>> int noRecordsCount = 0;
>> int stopCount = 1000;
>>
>> while (true) {
>> final ConsumerRecords consumerRecords =
>> consumer.poll(1000);
>> if (consumerRecords.count()==0) {
>> noRecordsCount++;
>> if (noRecordsCount > giveUp) break;
>> else continue;
>> }
>>
>>
>> consumerRecords.forEach(record -> {
>> System.out.printf("\nConsumer Record:(%s, %s, %s)",
>> record.key(), new String(record.value()), record.topic());
>> });
>>
>> consumer.commitSync();
>> break;
>> }
>> consumer.close();
>> System.out.println("DONE");
>> }
>>
>> private static Consumer createConsumer() {
>> final Properties props = new Properties();
>> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> BOOTSTRAP_SERVERS);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG,
>> "KafkaExampleConsumer");
>> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> StringDeserializer.class.getName());
>> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> ByteArrayDeserializer.class.getName());
>> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
>> props.put("group.id", "test");
>> props.put("enable.auto.commit", "false");
>>
>> // Create the consumer using props.
>> final Consumer consumer = new
KafkaConsumer(props);
>> consumer.subscribe(Collections.singletonList(TOPIC));
>> return consumer;
>> }
>> }
>>
>> But the consumer is not reading any message from kafka. If I add the
below
>> at the very start()
>>
>> consumer.poll(0);
>>
>> consumer.seekToBeginning(consumer.assignment());
>>
>>
>> Then the consumer starts reading from the topic. But then each time the
>> consumer is restarted it is reading message from the start of the topic
>> which I don;t want. Can someone let me know what is going wrong and how
can
>> I fix this?
>>
>>
>> Kafka Version 0.10
>>
>>
>>
>


Re: Java Consumer Not reading message -

2018-02-16 Thread Matthias J. Sax
Can you check the committed offsets using bin/kafka-consumer-group.sh ?

Also inspect your consumer's position via KafkaConsumer#position() to
see where the consumer actually is in the topic.


-Matthias


On 2/16/18 5:13 AM, Debraj Manna wrote:
> I have posted the same question in stackoverflow also. But I have not got
> any reply there also
> 
> https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic
> 
> On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna 
> wrote:
> 
>> I have a simple java producer like below
>>
>> public class Producer
>> {
>> private final static String TOPIC = "my-example-topi8";
>> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>> public static void main( String[] args ) throws Exception {
>> Producer producer = createProducer();
>> for(int i=0;i<3000;i++) {
>> String msg = "Test Message-" + i;
>> final ProducerRecord record = new
>> ProducerRecord(TOPIC, "key" + i, msg.getBytes());
>> producer.send(record).get();
>> System.out.println("Sent message " + msg);
>> }
>> producer.close();
>> }
>>
>> private static Producer createProducer() {
>> Properties props = new Properties();
>> props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
>> props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>> props.put("client.id", "AppFromJava");
>> props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>> props.put("key.serializer.class", "kafka.serializer.
>> StringEncoder");
>> props.put("key.serializer", "org.apache.kafka.common.
>> serialization.StringSerializer");
>> props.put("compression.codec", "snappy");
>> props.put("value.serializer", "org.apache.kafka.common.
>> serialization.ByteArraySerializer");
>> return new KafkaProducer(props);
>> }
>> }
>>
>> I am trying to read data as below
>>
>> public class Consumer
>> {
>> private final static String TOPIC = "my-example-topi8";
>> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>> public static void main( String[] args ) throws Exception {
>> Consumer consumer = createConsumer();
>> start(consumer);
>> }
>>
>> static void start(Consumer consumer) throws
>> InterruptedException {
>> final int giveUp = 10;
>> int noRecordsCount = 0;
>> int stopCount = 1000;
>>
>> while (true) {
>> final ConsumerRecords consumerRecords =
>> consumer.poll(1000);
>> if (consumerRecords.count()==0) {
>> noRecordsCount++;
>> if (noRecordsCount > giveUp) break;
>> else continue;
>> }
>>
>>
>> consumerRecords.forEach(record -> {
>> System.out.printf("\nConsumer Record:(%s, %s, %s)",
>> record.key(), new String(record.value()), record.topic());
>> });
>>
>> consumer.commitSync();
>> break;
>> }
>> consumer.close();
>> System.out.println("DONE");
>> }
>>
>> private static Consumer createConsumer() {
>> final Properties props = new Properties();
>> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> BOOTSTRAP_SERVERS);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG,
>> "KafkaExampleConsumer");
>> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> StringDeserializer.class.getName());
>> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> ByteArrayDeserializer.class.getName());
>> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
>> props.put("group.id", "test");
>> props.put("enable.auto.commit", "false");
>>
>> // Create the consumer using props.
>> final Consumer consumer = new KafkaConsumer(props);
>> consumer.subscribe(Collections.singletonList(TOPIC));
>> return consumer;
>> }
>> }
>>
>> But the consumer is not reading any message from kafka. If I add the below
>> at the very start()
>>
>> consumer.poll(0);
>>
>> consumer.seekToBeginning(consumer.assignment());
>>
>>
>> Then the consumer starts reading from the topic. But then each time the
>> consumer is restarted it is reading message from the start of the topic
>> which I don;t want. Can someone let me know what is going wrong and how can
>> I fix this?
>>
>>
>> Kafka Version 0.10
>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Java Consumer Not reading message -

2018-02-16 Thread Debraj Manna
I have posted the same question in stackoverflow also. But I have not got
any reply there also

https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic

On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna 
wrote:

> I have a simple java producer like below
>
> public class Producer
> {
> private final static String TOPIC = "my-example-topi8";
> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>
> public static void main( String[] args ) throws Exception {
> Producer producer = createProducer();
> for(int i=0;i<3000;i++) {
> String msg = "Test Message-" + i;
> final ProducerRecord record = new
> ProducerRecord(TOPIC, "key" + i, msg.getBytes());
> producer.send(record).get();
> System.out.println("Sent message " + msg);
> }
> producer.close();
> }
>
> private static Producer createProducer() {
> Properties props = new Properties();
> props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
> props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
> props.put("client.id", "AppFromJava");
> props.put("serializer.class", "kafka.serializer.DefaultEncoder");
> props.put("key.serializer.class", "kafka.serializer.
> StringEncoder");
> props.put("key.serializer", "org.apache.kafka.common.
> serialization.StringSerializer");
> props.put("compression.codec", "snappy");
> props.put("value.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
> return new KafkaProducer(props);
> }
> }
>
> I am trying to read data as below
>
> public class Consumer
> {
> private final static String TOPIC = "my-example-topi8";
> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>
> public static void main( String[] args ) throws Exception {
> Consumer consumer = createConsumer();
> start(consumer);
> }
>
> static void start(Consumer consumer) throws
> InterruptedException {
> final int giveUp = 10;
> int noRecordsCount = 0;
> int stopCount = 1000;
>
> while (true) {
> final ConsumerRecords consumerRecords =
> consumer.poll(1000);
> if (consumerRecords.count()==0) {
> noRecordsCount++;
> if (noRecordsCount > giveUp) break;
> else continue;
> }
>
>
> consumerRecords.forEach(record -> {
> System.out.printf("\nConsumer Record:(%s, %s, %s)",
> record.key(), new String(record.value()), record.topic());
> });
>
> consumer.commitSync();
> break;
> }
> consumer.close();
> System.out.println("DONE");
> }
>
> private static Consumer createConsumer() {
> final Properties props = new Properties();
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> BOOTSTRAP_SERVERS);
> props.put(ConsumerConfig.GROUP_ID_CONFIG,
> "KafkaExampleConsumer");
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> ByteArrayDeserializer.class.getName());
> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
> props.put("group.id", "test");
> props.put("enable.auto.commit", "false");
>
> // Create the consumer using props.
> final Consumer consumer = new KafkaConsumer(props);
> consumer.subscribe(Collections.singletonList(TOPIC));
> return consumer;
> }
> }
>
> But the consumer is not reading any message from kafka. If I add the below
> at the very start()
>
> consumer.poll(0);
>
> consumer.seekToBeginning(consumer.assignment());
>
>
> Then the consumer starts reading from the topic. But then each time the
> consumer is restarted it is reading message from the start of the topic
> which I don;t want. Can someone let me know what is going wrong and how can
> I fix this?
>
>
> Kafka Version 0.10
>
>
>


Java Consumer Not reading message -

2018-02-16 Thread Debraj Manna
I have a simple java producer like below

public class Producer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";

public static void main( String[] args ) throws Exception {
Producer producer = createProducer();
for(int i=0;i<3000;i++) {
String msg = "Test Message-" + i;
final ProducerRecord record = new
ProducerRecord(TOPIC, "key" + i, msg.getBytes());
producer.send(record).get();
System.out.println("Sent message " + msg);
}
producer.close();
}

private static Producer createProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("client.id", "AppFromJava");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.codec", "snappy");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer(props);
}
}

I am trying to read data as below

public class Consumer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";

public static void main( String[] args ) throws Exception {
Consumer consumer = createConsumer();
start(consumer);
}

static void start(Consumer consumer) throws
InterruptedException {
final int giveUp = 10;
int noRecordsCount = 0;
int stopCount = 1000;

while (true) {
final ConsumerRecords consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}


consumerRecords.forEach(record -> {
System.out.printf("\nConsumer Record:(%s, %s, %s)",
record.key(), new String(record.value()), record.topic());
});

consumer.commitSync();
break;
}
consumer.close();
System.out.println("DONE");
}

private static Consumer createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");

// Create the consumer using props.
final Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
}

But the consumer is not reading any message from kafka. If I add the below
at the very start()

consumer.poll(0);

consumer.seekToBeginning(consumer.assignment());


Then the consumer starts reading from the topic. But then each time the
consumer is restarted it is reading message from the start of the topic
which I don;t want. Can someone let me know what is going wrong and how can
I fix this?


Kafka Version 0.10