don caldwell created KAFKA-3972:
-----------------------------------
Summary: kafka java consumer poll returns 0 records after
seekToBeginning
Key: KAFKA-3972
URL: https://issues.apache.org/jira/browse/KAFKA-3972
Project: Kafka
Issue Type: Task
Components: consumer
Affects Versions: 0.10.0.0
Environment: docker image elasticsearch:latest, kafka scala version
2.11, kafka version 0.10.0.0
Reporter: don caldwell
kafkacat successfully returns rows for the topic, but the following java source
reliably fails to produce rows. I have the suspicion that I am missing some
simple thing in my setup, but I have been unable to find a way out. I am using
the current docker and using docker network commands to connect the processes
in my cluster. The properties are:
bootstrap.servers: kafka01:9092,kafka02:9092,kafka03:9092
group.id: dhcp1
topic: dhcp
enable.auto.commit: false
auto.commit.interval.ms: 1000
session.timeout.ms 30000
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
the kafka consumer follows. One thing that I find curious is that, although I
seem to successfully make the call to seekToBeginning(), when I print offsets
on failure, I get large offsets for all partitions although I had expected them
to be 0 or at least some small number.
Here is the code:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.Integer;
import java.lang.System;
import java.lang.Thread;
import java.lang.InterruptedException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KConsumer {
private Properties prop;
private String topic;
private Integer polln;
private KafkaConsumer<String, String> consumer;
private String[] pna = {ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
ConsumerConfig.GROUP_ID_CONFIG,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG};
public KConsumer(String pf) throws FileNotFoundException,
IOException {
this.setProperties(pf);
this.newClient();
}
public void setProperties(String p) throws FileNotFoundException,
IOException {
this.prop = new Properties();
this.prop.load(new FileInputStream(p));
this.topic = this.prop.getProperty("topic");
this.polln = new Integer(this.prop.getProperty("polln"));
}
public void setTopic(String t) {
this.topic = t;
}
public String getTopic() {
return this.topic;
}
public void newClient() {
System.err.println("creating consumer");
Properties kp = new Properties();
for(String p : pna) {
String v = this.prop.getProperty(p);
if(v != null) {
kp.put(p, v);
}
}
//this.consumer = new KafkaConsumer<>(this.prop);
this.consumer = new KafkaConsumer<>(kp);
//this.consumer.subscribe(Collections.singletonList(this.topic));
System.err.println("subscribing to " + this.topic);
this.consumer.subscribe(Arrays.asList(this.topic));
//this.seekToBeginning();
}
public void close() {
this.consumer.close();
this.consumer = null;
}
public void seekToBeginning() {
if(this.topic == null) {
System.err.println("KConsumer: topic not set");
System.exit(1);
}
System.err.println("setting partition offset to beginning");
java.util.Set<TopicPartition> tps = this.consumer.assignment();
this.consumer.seekToBeginning(tps);
}
public ConsumerRecords<String,String> nextBatch()
throws KafkaException {
while(true) {
try {
System.err.printf("polling...");
ConsumerRecords<String,String> records =
this.consumer.poll(this.polln);
System.err.println("returned");
System.err.printf("record count %d\n", records.count());
return records;
} catch(SchemaException e) {
System.err.println("nextBatch: " + e);
} catch(KafkaException e) {
System.err.println("nextBatch: " + e);
throw e;
} catch(Exception e) {
System.err.println("nextBatch: " + e);
this.consumer.close();
System.exit(1);
}
try {
System.err.println("sleeping");
Thread.sleep(2000);
} catch(InterruptedException e) {
System.err.println(e);
System.exit(0);
}
}
}
public void printBatch(ConsumerRecords<String,String> records) {
System.err.println("printing...");
Iterable<ConsumerRecord<String,String>> ri =
records.records(this.topic);
for (ConsumerRecord<String, String> record : ri) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(),
record.value());
}
}
public void doProcess() {
Integer n = 0;
Integer f = 0;
long total = 0;
try {
while(true) {
ConsumerRecords<String,String> r = this.nextBatch();
long count = r.count();
if(r.count() > 0) {
total += count;
this.printBatch(r);
n = n + 1;
} else {
f = f + 1;
}
if(f > 10) {
System.err.printf("total %d\n", total);
this.printMisc();
break;
}
}
} finally {
this.consumer.close();
}
}
public void printPosition(int pid) {
try {
TopicPartition tp = new TopicPartition(this.topic, pid);
long pos = this.consumer.position(tp);
System.err.printf(" offset: %d\n", pos);
} catch(IllegalArgumentException e) {
System.err.printf("printPosition: %d %s\n", pid, e);
}
}
public void printMisc() {
Map<String,List<PartitionInfo>> topicMap;
List<PartitionInfo> partitionList;
System.err.println("in printMisc");
try {
topicMap = this.consumer.listTopics();
for(String key: topicMap.keySet()) {
if(key.compareTo(this.topic) != 0) continue;
System.err.printf("topic: %s\n", key);
List<PartitionInfo> pl = topicMap.get(key);
for(PartitionInfo pinf: pl) {
System.err.printf("partition %d\n", pinf.partition());
System.err.printf(" leader %s\n",
pinf.leader().host());
this.printPosition(pinf.partition());
System.err.printf(" replicas:\n");
for(Node r: pinf.replicas()) {
System.err.printf(" %s %s\n", r.id(), r.host());
}
System.err.printf(" inSyncReplicas:\n");
for(Node r: pinf.inSyncReplicas()) {
System.err.printf(" %s %s\n", r.id(), r.host());
}
}
}
} catch (TimeoutException e) {
System.err.printf("printMisc: %s\n", e);
//System.exit(1);
}
}
public static void main(String[] args) throws FileNotFoundException,
IOException, InterruptedException {
if(args.length == 1) {
Thread.sleep(2000); // docker network connect
KConsumer kc = new KConsumer(args[0]);
//kc.printMisc();
kc.doProcess();
} else {
System.err.println("Usage KConsumer propfile");
System.exit(1);
}
}
}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)