Producer not producing msgs
Hi all, We are using kafka - 0.8 in our production setup. We have 2 kafka servers and 2 zookeepers. There seem to be some sort of network problem in our DC. As a result of this, connection refused exception in replica which was thrown , is shown here. Our producer didnt throw any exception. its log is normal as shown here. But our consumers couldnt consume from kafka. When i restart kafka servers, it worked correctly. Is there any approach to overcome this in future ??? Any help in resolving this issue will be appreaciated. PS : Our producers having configuration "request.required.acks=1"
TCP_TOO_MANY_ESTAB_CONNS
Dear all, We started using kafka as a queue server recently. Our production setup has 3 kafka servers and 3 zookeepers. We having 30 topics with 10 partitions each. We have producer pool(10 producers / APP_SERVERS). So totally 30 producers. With our monitoring tool, we could detect "TCP_TOO_MANY_ESTAB_CONNS" warning message frequently. Why is this warning comes so frequently ?? Are we are missing something ??? PS : Our monitoring tool's threshold is 600 connections thanks in advance Ranjith Venkatesan
Log retention policy - offset ?
Hi,We are about to use kafka-0.8 in our production. We have our config to delete messages less than a day i.e 24 hrs. We have our simple consumer which will have its offset in memory and will handle if offsetoutofrange exception occurs.Suppose if old log messages are getting deleted what will happens to offset ? Let me give an example,Suppose we have 10 messages in which 3 messages are older than 24 hrs. My consumer consumed upto message 5 (offset 4). still 5 more msgs yet to consumeAfter log deletion what will be the offset ? 1. If offset gonna be 0 for message 4, then our consumer will consume only one message. (4 msgs lost ?)2. If offset gonna be 3 for message 4, then its fine. But what happens if offset keeps on increasing and reaches its maximum ? (just curious)Correct me if i am wrong.Thanks in advanceRanjith Venkatesan
SimpleConsumer not consuming messages
Hi, We are evaluating kafka-0.8 for our product. We will start consumer for each partition. When i try to consume using High-Level API i could able to consume from kafka. But when i try to consume from kafka using Low-Level API, i am getting message size as 0. Am i missing some configuration? PS: I have attached the test code used for testing SimpleConsumer Thanks in advance Ranjith Venkatesan package kafka.examples; import java.util.logging.Level; import java.util.logging.Logger; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.common.ErrorMapping; import kafka.javaapi.FetchResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; //$Id$ public class SimpleKafkaConsumer implements Runnable { private static final Logger LOGGER = Logger.getLogger(SimpleKafkaConsumer.class.getName()); private static final int TIME_OUT = 10; private static final int BUFFER_SIZE = 64 * 1024; private static final int FETCH_SIZE = 1; private int partitionId; private int errorCount = 0; private Long readOffset; private Long processedOffset = 0l; private String leader; private String clientName; private String topic; private SimpleConsumer consumer = null; private SimpleKafkaConsumer(String topic, int partitionId, Long readOffset, String leader) throws Exception { this.topic = topic; this.partitionId = partitionId; this.readOffset = readOffset; this.leader = leader; //init clientName = KafkaConsumerUtil.formClientName(topic, partitionId); } /** * Fetches messages, writes to underlying Writer and schedules itself for * execution. */ @Override public void run() { int i=0; while(true) { try { //check leader status, if null get leader from kafka broker try { if (leader == null) { leader = KafkaConsumerUtil.findLeader(topic, partitionId, 5); } } catch (Exception e) { LOGGER.log(Level.SEVERE, "Cannot find leader for topic " + topic + " of partition " + partitionId + ", will retry at next schedule,", e); } //leader cannot be found, try at next execution. if (leader == null) { return; } //check simpleconsumer status, if null create new simpleconsumer instance if (consumer == null) { LOGGER.log(Level.INFO, "Leader for {0} is {1}.", new Object[] { clientName, leader }); consumer = new SimpleConsumer(leader.split(":")[0], Integer.parseInt(leader.split(":")[1]), TIME_OUT, BUFFER_SIZE, clientName); } //check read offset status, if null get earliest offset from kafka broker if (readOffset == null) { readOffset = KafkaConsumerUtil.getOffset(consumer, topic, partitionId, true, clientName); } //fetch message ByteBufferMessageSet fetchedMessages = handleFetch(consumer, topic, partitionId, readOffset); LOGGER.log(Level.INFO, "Reading from offset {0} for {1}.", new Object[] { readOffset, clientName }); //read fetched messages and write if (fetchedMessages != null) { LOGGER.log(Level.INFO, "Fetched message of size {0} bytes.", fetchedMessages.sizeInBytes()); for (MessageAndOffset messageAndOffset : fetchedMessages) { long currentOffset = messageAndOffset.offset()
Connection reset by peer
Hi , We are evaluating kafka 0.8 for our product as a queue system. Our architecture remains simple. Our producer (single) will send mesages to any of the topics in broker. Thread will be running for each of the topic for every 10 secs,which in turn consume from its corresponding topic. For each stream we will be using ThreadPool to increase the degree of parallelism. But when shutting down the Consumer, i am getting following exception in broker, [2014-02-01 13:06:43,240] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233) at sun.nio.ch.IOUtil.read(IOUtil.java:206) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236) at kafka.utils.Utils$.read(Utils.scala:395) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:619) I am using Highlevel consumer, and the configuration for consumer is given below, Properties props = new Properties(); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", groupid); props.put("zookeeper.session.timeout.ms", "12000"); props.put("zookeeper.sync.time.ms", "200"); // props.put("auto.commit.interval.ms", "1000"); props.put("auto.commit.enable", "false"); //props.put("consumer.timeout.ms", "1"); props.put("auto.offset.reset", "smallest"); props.put("socket.timeout.ms", "1"); Am i missing something ? Or this is a expected exception??? PS: We are using Java Client Thanks in advance Ranjith Venkatesan