Producer not producing msgs

2014-05-21 Thread Ranjith Venkatesan
Hi all,

  We are using kafka - 0.8 in our production setup. We have 2 kafka servers 
and 2 zookeepers. There seem to be some sort of network problem in our DC. As a 
result of this, connection refused exception in replica which was thrown , is 
shown here. Our producer didnt throw any exception. its log is normal as shown 
here. But our consumers couldnt consume from kafka. 


When i restart kafka servers, it worked correctly. Is there any approach to 
overcome this in future ??? 


Any help in resolving this issue will be appreaciated. 


PS : Our producers having configuration  "request.required.acks=1" 













TCP_TOO_MANY_ESTAB_CONNS

2014-05-13 Thread Ranjith Venkatesan
Dear all,

  We started using kafka as a queue server recently. Our production setup 
has 3 kafka servers and 3 zookeepers.   We having 30 topics with 10 partitions 
each. We have producer pool(10 producers / APP_SERVERS). So totally 30 
producers. With our monitoring tool, we could detect "TCP_TOO_MANY_ESTAB_CONNS" 
warning message frequently. Why is this warning comes so frequently ?? Are we 
are missing something ??? 




PS : Our monitoring tool's threshold is 600 connections


thanks in advance


Ranjith Venkatesan






Log retention policy - offset ?

2014-03-27 Thread Ranjith Venkatesan
Hi,We are about to use kafka-0.8 in our production. We have our config to delete messages less than a day i.e 24 hrs. We have our simple consumer which will have its offset in memory and will handle if offsetoutofrange exception occurs.Suppose if old log messages are getting deleted what will happens to offset ? Let me give an example,Suppose we have 10 messages in which 3 messages are older than 24 hrs. My consumer consumed upto message 5 (offset 4). still 5 more msgs yet to consumeAfter log deletion what will be the offset ? 1. If offset gonna be 0 for message 4, then our consumer will consume only one message. (4 msgs lost ?)2. If offset gonna be 3 for message 4, then its fine. But what happens if offset keeps on increasing and reaches its maximum ? (just curious)Correct me if i am wrong.Thanks in advanceRanjith Venkatesan

SimpleConsumer not consuming messages

2014-02-24 Thread Ranjith Venkatesan
Hi, 

  We are evaluating kafka-0.8 for our product. We will start consumer for 
each partition. When i try to consume using High-Level API i could able to 
consume from kafka. But when i try to consume from kafka using Low-Level API, i 
am getting message size as 0. Am i missing some configuration? 




PS: I have attached the test code used for testing SimpleConsumer



Thanks in advance


Ranjith Venkatesan




package kafka.examples;


import java.util.logging.Level;
import java.util.logging.Logger;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;





//$Id$


public class SimpleKafkaConsumer implements Runnable
{
private static final Logger LOGGER = 
Logger.getLogger(SimpleKafkaConsumer.class.getName());

private static final int TIME_OUT = 10;
private static final int BUFFER_SIZE = 64 * 1024;
private static final int FETCH_SIZE = 1;


private int partitionId;
private int errorCount = 0;
private Long readOffset;
private Long processedOffset = 0l;
private String leader;
private String clientName;
private String topic;

private SimpleConsumer consumer = null;

private SimpleKafkaConsumer(String topic, int partitionId, Long 
readOffset, String leader) throws Exception
{
this.topic = topic;
this.partitionId = partitionId;
this.readOffset = readOffset;
this.leader = leader;

//init
clientName = KafkaConsumerUtil.formClientName(topic, 
partitionId);
}


/**
 * Fetches messages, writes to underlying Writer and schedules itself 
for
 * execution.
 */
@Override
public void run()
{
int i=0;
while(true)
{
try
{
//check leader status, if null get leader from 
kafka broker
try
{
if (leader == null)
{
leader = 
KafkaConsumerUtil.findLeader(topic, partitionId, 5);
}
}
catch (Exception e)
{
LOGGER.log(Level.SEVERE, "Cannot find 
leader for topic " + topic + " of partition " + partitionId + ", will retry at 
next schedule,", e);
}

//leader cannot be found, try at next execution.
if (leader == null)
{
return;
}
//check simpleconsumer status, if null create 
new simpleconsumer instance
if (consumer == null)
{
LOGGER.log(Level.INFO, "Leader for {0} 
is {1}.", new Object[] { clientName, leader });
consumer = new 
SimpleConsumer(leader.split(":")[0], Integer.parseInt(leader.split(":")[1]), 
TIME_OUT, BUFFER_SIZE, clientName);
}

//check read offset status, if null get 
earliest offset from kafka broker
if (readOffset == null)
{
readOffset = 
KafkaConsumerUtil.getOffset(consumer, topic, partitionId, true, clientName);
}

//fetch message
ByteBufferMessageSet fetchedMessages = 
handleFetch(consumer, topic, partitionId, readOffset);
LOGGER.log(Level.INFO, "Reading from offset {0} 
for {1}.", new Object[] { readOffset, clientName });
//read fetched messages and write
if (fetchedMessages != null)
{
LOGGER.log(Level.INFO, "Fetched message 
of size {0} bytes.", fetchedMessages.sizeInBytes());
for (MessageAndOffset messageAndOffset 
: fetchedMessages)
{
long currentOffset = 
messageAndOffset.offset()

Connection reset by peer

2014-02-01 Thread Ranjith Venkatesan
Hi ,

 We are evaluating kafka 0.8 for our product as a queue system. Our 
architecture remains simple. Our producer (single) will send mesages to any of 
the topics in broker. Thread will be running for each of the topic for every 10 
secs,which in turn consume from its corresponding topic.  For each stream we 
will be using ThreadPool to increase the degree of parallelism. But when 
shutting down the Consumer, i am getting following exception in broker,


 [2014-02-01 13:06:43,240] ERROR Closing socket for /127.0.0.1 because of error 
(kafka.network.Processor)java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcher.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:233)
 at sun.nio.ch.IOUtil.read(IOUtil.java:206)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:236)
 at kafka.utils.Utils$.read(Utils.scala:395)
 at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:347)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:619)




I am using Highlevel consumer, and the configuration for consumer is given 
below,




Properties props = new Properties();
 props.put("zookeeper.connect", "localhost:2181");
 props.put("group.id", groupid);
 props.put("zookeeper.session.timeout.ms", "12000");
 props.put("zookeeper.sync.time.ms", "200");
 // props.put("auto.commit.interval.ms", "1000");
 props.put("auto.commit.enable", "false");
 //props.put("consumer.timeout.ms", "1");
 props.put("auto.offset.reset", "smallest");
 props.put("socket.timeout.ms", "1");



Am i missing something ? Or this is a expected exception???


PS: We are using Java Client


Thanks in advance




Ranjith Venkatesan