Re: Java Consumer Not reading message -
Thanks Matthias for replying. The answer has been discussed the stackoverflow link which I have posted in the question. On 16-Feb-2018 11:35 PM, "Matthias J. Sax" wrote: Can you check the committed offsets using bin/kafka-consumer-group.sh ? Also inspect your consumer's position via KafkaConsumer#position() to see where the consumer actually is in the topic. -Matthias On 2/16/18 5:13 AM, Debraj Manna wrote: > I have posted the same question in stackoverflow also. But I have not got > any reply there also > > https://stackoverflow.com/questions/48826279/kafka-0-10- java-consumer-not-reading-message-from-topic > > On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna > wrote: > >> I have a simple java producer like below >> >> public class Producer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Producer producer = createProducer(); >> for(int i=0;i<3000;i++) { >> String msg = "Test Message-" + i; >> final ProducerRecord record = new >> ProducerRecord(TOPIC, "key" + i, msg.getBytes()); >> producer.send(record).get(); >> System.out.println("Sent message " + msg); >> } >> producer.close(); >> } >> >> private static Producer createProducer() { >> Properties props = new Properties(); >> props.put("metadata.broker.list", BOOTSTRAP_SERVERS); >> props.put("bootstrap.servers", BOOTSTRAP_SERVERS); >> props.put("client.id", "AppFromJava"); >> props.put("serializer.class", "kafka.serializer.DefaultEncoder"); >> props.put("key.serializer.class", "kafka.serializer. >> StringEncoder"); >> props.put("key.serializer", "org.apache.kafka.common. >> serialization.StringSerializer"); >> props.put("compression.codec", "snappy"); >> props.put("value.serializer", "org.apache.kafka.common. >> serialization.ByteArraySerializer"); >> return new KafkaProducer(props); >> } >> } >> >> I am trying to read data as below >> >> public class Consumer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Consumer consumer = createConsumer(); >> start(consumer); >> } >> >> static void start(Consumer consumer) throws >> InterruptedException { >> final int giveUp = 10; >> int noRecordsCount = 0; >> int stopCount = 1000; >> >> while (true) { >> final ConsumerRecords consumerRecords = >> consumer.poll(1000); >> if (consumerRecords.count()==0) { >> noRecordsCount++; >> if (noRecordsCount > giveUp) break; >> else continue; >> } >> >> >> consumerRecords.forEach(record -> { >> System.out.printf("\nConsumer Record:(%s, %s, %s)", >> record.key(), new String(record.value()), record.topic()); >> }); >> >> consumer.commitSync(); >> break; >> } >> consumer.close(); >> System.out.println("DONE"); >> } >> >> private static Consumer createConsumer() { >> final Properties props = new Properties(); >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >> BOOTSTRAP_SERVERS); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, >> "KafkaExampleConsumer"); >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> ByteArrayDeserializer.class.getName()); >> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); >> props.put("group.id", "test"); >> props.put("enable.auto.commit", "false"); >> >> // Create the consumer using props. >> final Consumer consumer = new KafkaConsumer(props); >> consumer.subscribe(Collections.singletonList(TOPIC)); >> return consumer; >> } >> } >> >> But the consumer is not reading any message from kafka. If I add the below >> at the very start() >> >> consumer.poll(0); >> >> consumer.seekToBeginning(consumer.assignment()); >> >> >> Then the consumer starts reading from the topic. But then each time the >> consumer is restarted it is reading message from the start of the topic >> which I don;t want. Can someone let me know what is going wrong and how can >> I fix this? >> >> >> Kafka Version 0.10 >> >> >> >
Re: Java Consumer Not reading message -
Can you check the committed offsets using bin/kafka-consumer-group.sh ? Also inspect your consumer's position via KafkaConsumer#position() to see where the consumer actually is in the topic. -Matthias On 2/16/18 5:13 AM, Debraj Manna wrote: > I have posted the same question in stackoverflow also. But I have not got > any reply there also > > https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic > > On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna > wrote: > >> I have a simple java producer like below >> >> public class Producer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Producer producer = createProducer(); >> for(int i=0;i<3000;i++) { >> String msg = "Test Message-" + i; >> final ProducerRecord record = new >> ProducerRecord(TOPIC, "key" + i, msg.getBytes()); >> producer.send(record).get(); >> System.out.println("Sent message " + msg); >> } >> producer.close(); >> } >> >> private static Producer createProducer() { >> Properties props = new Properties(); >> props.put("metadata.broker.list", BOOTSTRAP_SERVERS); >> props.put("bootstrap.servers", BOOTSTRAP_SERVERS); >> props.put("client.id", "AppFromJava"); >> props.put("serializer.class", "kafka.serializer.DefaultEncoder"); >> props.put("key.serializer.class", "kafka.serializer. >> StringEncoder"); >> props.put("key.serializer", "org.apache.kafka.common. >> serialization.StringSerializer"); >> props.put("compression.codec", "snappy"); >> props.put("value.serializer", "org.apache.kafka.common. >> serialization.ByteArraySerializer"); >> return new KafkaProducer(props); >> } >> } >> >> I am trying to read data as below >> >> public class Consumer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Consumer consumer = createConsumer(); >> start(consumer); >> } >> >> static void start(Consumer consumer) throws >> InterruptedException { >> final int giveUp = 10; >> int noRecordsCount = 0; >> int stopCount = 1000; >> >> while (true) { >> final ConsumerRecords consumerRecords = >> consumer.poll(1000); >> if (consumerRecords.count()==0) { >> noRecordsCount++; >> if (noRecordsCount > giveUp) break; >> else continue; >> } >> >> >> consumerRecords.forEach(record -> { >> System.out.printf("\nConsumer Record:(%s, %s, %s)", >> record.key(), new String(record.value()), record.topic()); >> }); >> >> consumer.commitSync(); >> break; >> } >> consumer.close(); >> System.out.println("DONE"); >> } >> >> private static Consumer createConsumer() { >> final Properties props = new Properties(); >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >> BOOTSTRAP_SERVERS); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, >> "KafkaExampleConsumer"); >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> ByteArrayDeserializer.class.getName()); >> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); >> props.put("group.id", "test"); >> props.put("enable.auto.commit", "false"); >> >> // Create the consumer using props. >> final Consumer consumer = new KafkaConsumer(props); >> consumer.subscribe(Collections.singletonList(TOPIC)); >> return consumer; >> } >> } >> >> But the consumer is not reading any message from kafka. If I add the below >> at the very start() >> >> consumer.poll(0); >> >> consumer.seekToBeginning(consumer.assignment()); >> >> >> Then the consumer starts reading from the topic. But then each time the >> consumer is restarted it is reading message from the start of the topic >> which I don;t want. Can someone let me know what is going wrong and how can >> I fix this? >> >> >> Kafka Version 0.10 >> >> >> > signature.asc Description: OpenPGP digital signature
Re: Java Consumer Not reading message -
I have posted the same question in stackoverflow also. But I have not got any reply there also https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna wrote: > I have a simple java producer like below > > public class Producer > { > private final static String TOPIC = "my-example-topi8"; > private final static String BOOTSTRAP_SERVERS = "localhost:8092"; > > public static void main( String[] args ) throws Exception { > Producer producer = createProducer(); > for(int i=0;i<3000;i++) { > String msg = "Test Message-" + i; > final ProducerRecord record = new > ProducerRecord(TOPIC, "key" + i, msg.getBytes()); > producer.send(record).get(); > System.out.println("Sent message " + msg); > } > producer.close(); > } > > private static Producer createProducer() { > Properties props = new Properties(); > props.put("metadata.broker.list", BOOTSTRAP_SERVERS); > props.put("bootstrap.servers", BOOTSTRAP_SERVERS); > props.put("client.id", "AppFromJava"); > props.put("serializer.class", "kafka.serializer.DefaultEncoder"); > props.put("key.serializer.class", "kafka.serializer. > StringEncoder"); > props.put("key.serializer", "org.apache.kafka.common. > serialization.StringSerializer"); > props.put("compression.codec", "snappy"); > props.put("value.serializer", "org.apache.kafka.common. > serialization.ByteArraySerializer"); > return new KafkaProducer(props); > } > } > > I am trying to read data as below > > public class Consumer > { > private final static String TOPIC = "my-example-topi8"; > private final static String BOOTSTRAP_SERVERS = "localhost:8092"; > > public static void main( String[] args ) throws Exception { > Consumer consumer = createConsumer(); > start(consumer); > } > > static void start(Consumer consumer) throws > InterruptedException { > final int giveUp = 10; > int noRecordsCount = 0; > int stopCount = 1000; > > while (true) { > final ConsumerRecords consumerRecords = > consumer.poll(1000); > if (consumerRecords.count()==0) { > noRecordsCount++; > if (noRecordsCount > giveUp) break; > else continue; > } > > > consumerRecords.forEach(record -> { > System.out.printf("\nConsumer Record:(%s, %s, %s)", > record.key(), new String(record.value()), record.topic()); > }); > > consumer.commitSync(); > break; > } > consumer.close(); > System.out.println("DONE"); > } > > private static Consumer createConsumer() { > final Properties props = new Properties(); > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > BOOTSTRAP_SERVERS); > props.put(ConsumerConfig.GROUP_ID_CONFIG, > "KafkaExampleConsumer"); > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > ByteArrayDeserializer.class.getName()); > props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); > props.put("group.id", "test"); > props.put("enable.auto.commit", "false"); > > // Create the consumer using props. > final Consumer consumer = new KafkaConsumer(props); > consumer.subscribe(Collections.singletonList(TOPIC)); > return consumer; > } > } > > But the consumer is not reading any message from kafka. If I add the below > at the very start() > > consumer.poll(0); > > consumer.seekToBeginning(consumer.assignment()); > > > Then the consumer starts reading from the topic. But then each time the > consumer is restarted it is reading message from the start of the topic > which I don;t want. Can someone let me know what is going wrong and how can > I fix this? > > > Kafka Version 0.10 > > >
Java Consumer Not reading message -
I have a simple java producer like below public class Producer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static void main( String[] args ) throws Exception { Producer producer = createProducer(); for(int i=0;i<3000;i++) { String msg = "Test Message-" + i; final ProducerRecord record = new ProducerRecord(TOPIC, "key" + i, msg.getBytes()); producer.send(record).get(); System.out.println("Sent message " + msg); } producer.close(); } private static Producer createProducer() { Properties props = new Properties(); props.put("metadata.broker.list", BOOTSTRAP_SERVERS); props.put("bootstrap.servers", BOOTSTRAP_SERVERS); props.put("client.id", "AppFromJava"); props.put("serializer.class", "kafka.serializer.DefaultEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("compression.codec", "snappy"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); return new KafkaProducer(props); } } I am trying to read data as below public class Consumer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static void main( String[] args ) throws Exception { Consumer consumer = createConsumer(); start(consumer); } static void start(Consumer consumer) throws InterruptedException { final int giveUp = 10; int noRecordsCount = 0; int stopCount = 1000; while (true) { final ConsumerRecords consumerRecords = consumer.poll(1000); if (consumerRecords.count()==0) { noRecordsCount++; if (noRecordsCount > giveUp) break; else continue; } consumerRecords.forEach(record -> { System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic()); }); consumer.commitSync(); break; } consumer.close(); System.out.println("DONE"); } private static Consumer createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); // Create the consumer using props. final Consumer consumer = new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(TOPIC)); return consumer; } } But the consumer is not reading any message from kafka. If I add the below at the very start() consumer.poll(0); consumer.seekToBeginning(consumer.assignment()); Then the consumer starts reading from the topic. But then each time the consumer is restarted it is reading message from the start of the topic which I don;t want. Can someone let me know what is going wrong and how can I fix this? Kafka Version 0.10