Hi,
I am kind of new to Kafka. I have set up a 3 node kafka (1 broker per
machine) cluster with 3 node zookeer cluster. I am using Kafka 0.9.0.0
version.
The set up works fine wherein from my single producer I am pushing a JSON
string to Kafka to a topic with 3 partitions and replication factor of 2.
At consumer end I have application with 3 consumer threads (I suppose each
consumer thread will read from corresponding dedicated partition). The
consumer reads the JSON and persist the same in DB in a separate thread.
Following are consumer properties:
topic=TestTopic2807
bootstrap.servers=XXX.221:9092,XXX.222:9092,XXX.221:9092
topic.consumer.threads=3
group.id=EOTG
client.id=EOTG
enable.auto.commit=true
auto.commit.interval.ms=10000
session.timeout.ms=30000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
The consumer thread routine is as follows: Each consumer runs following in
it's own thread and spawns a new thread for DB operation (I know DB
operation failure can be issue but will fix that sooner)
ConsumerRecords<String, String> records = consumer.poll(20);
if(!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
String eOCJSONString = record.value();
logger.info("Received the records at consumer id:" +
consumerId +
". Record topic:" + record.topic() +
". Record partition:" + record.partition() +
". Record offset id:" + record.offset());
logger.info("\n Record:" + eOCJSONString);
if (emailOCJSONString.startsWith("{")) {
OCBean ocBean = gson.fromJson(record.value(),
EOCBean.class);
executorServiceWorker.submit(new OCWorker(ocBean,
consumerId));
:
}
The problem occurs when I load test the application sending 30k of messages
(JSONS) from single producer and when I tried bringing down one of the
broker while consumer is consuming the messages. I could observe that many
of the messages are processed duplicate (~200-800). I repeated this
experiment a few times and always noticed that there are many messages
which are read duplicate by consumer thread. I tried by bringing one, two
brokers down.
Is it normal to happen?
Should I switch to manual offset commit than enabling auto commit?
Or should I manually assign the partition in program rather than let
brokers manage it?
Am I missing something very important here?
Also,
I observed that Kafka-Python had similar bug and has been fixed it in 0.9.2
(https://github.com/dpkp/kafka-python/issues/189), but I believe no such
issue reported for Java.
Thanks,