CSharp librari and Producer Closing socket for because of error (kafka.network.Processor),java.nio.BufferUnderflowException

2014-05-13 Thread Margusja

Hi

I have kafka broker running (kafka_2.9.1-0.8.1.1)
All is working.

One project requires producer is written in CSharp
I am not dot net programmer but I managed to write simple producer code 
using 
https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md


the code
...
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Kafka.Client;

namespace DemoProducer
{
class Program
{
static void Main(string[] args)
{
string payload1 = kafka 1.;
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
Message msg1 = new Message(payloadData1);

string payload2 = kafka 2.;
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
Message msg2 = new Message(payloadData2);

Producer producer = new Producer(broker, 9092);
producer.Send(kafkademo3, 0 ,  msg1 );
}
}
}
...

In broker side I am getting the error if I executing the code above:

[2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because 
of error (kafka.network.Processor)

java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
at java.nio.ByteBuffer.get(ByteBuffer.java:694)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
at 
kafka.network.RequestChannel$Request.init(RequestChannel.scala:53)

at kafka.network.Processor.read(SocketServer.scala:353)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)



[2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 
because of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:375)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)

I suspected that the problem is in the broker version 
(kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.

Now I was able to send messages using CSharp code.

So is there workaround how I can use latest kafka version and CSharp ? 
Or What is the latest kafka version supporting CSharp producer?


And one more question. In Csharp lib how can I give to producer brokers 
list to get fault tolerance in case one broker is down?


--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)



Re: Kafka producer in CSharp

2014-05-13 Thread Margusja

Thank you for response. I think HTTP is ok.
I have two more question in case of HTTP.
1. Can I have fault tolerance in case I have two or more brokers?
2. Can I ack that message is in queue?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 12/05/14 23:28, Joe Stein wrote:

The wire protocol has changed drastically since then.

I don't know of any C# clients (there are none on the client library page
nor have I heard of any being used in production but maybe there are some).


For clients that use DotNet I often suggest that they use some HTTP
producer/consumer
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST

/***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, May 12, 2014 at 12:34 PM, Margusja mar...@roo.ee wrote:


Hi

I have kafka broker running (kafka_2.9.1-0.8.1.1)
All is working.

One project requires producer is written in CSharp
I am not dot net programmer but I managed to write simple producer code
using https://github.com/kafka-dev/kafka/blob/master/clients/
csharp/README.md

the code
...
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Kafka.Client;

namespace DemoProducer
{
 class Program
 {
 static void Main(string[] args)
 {
 string payload1 = kafka 1.;
 byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
 Message msg1 = new Message(payloadData1);

 string payload2 = kafka 2.;
 byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
 Message msg2 = new Message(payloadData2);

 Producer producer = new Producer(broker, 9092);
 producer.Send(kafkademo3, 0 ,  msg1 );
 }
 }
}
...

In broker side I am getting the error if I executing the code above:

[2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because
of error (kafka.network.Processor)
java.nio.BufferUnderflowException
 at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
 at java.nio.ByteBuffer.get(ByteBuffer.java:694)
 at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
 at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
 at kafka.network.RequestChannel$Request.init(RequestChannel.
scala:53)
 at kafka.network.Processor.read(SocketServer.scala:353)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:744)



[2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because
of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:375)
 at kafka.network.BoundedByteBufferReceive.readFrom(
BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:347)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:744)

I suspected that the problem is in the broker version
(kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.
Now I was able to send messages using CSharp code.

So is there workaround how I can use latest kafka version and CSharp ? Or
What is the latest kafka version supporting CSharp producer?

--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)






Re: Kafka producer in CSharp

2014-05-13 Thread Margusja

Ok got some info myself.

I can have fault tolerance - I can start kafka-http-endpoint using 
broker lists

I can have ack - start using --sync

But what is best practice in case if kafka-http-endpoint goes down?

Start multiple kafka-http-endpoint's and in client side just control 
that kafka-http-endpoint is up? And if not up then using another?


Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 13/05/14 10:49, Margusja wrote:

Thank you for response. I think HTTP is ok.
I have two more question in case of HTTP.
1. Can I have fault tolerance in case I have two or more brokers?
2. Can I ack that message is in queue?

Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)

On 12/05/14 23:28, Joe Stein wrote:

The wire protocol has changed drastically since then.

I don't know of any C# clients (there are none on the client library 
page
nor have I heard of any being used in production but maybe there are 
some).



For clients that use DotNet I often suggest that they use some HTTP
producer/consumer
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST 



/***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, May 12, 2014 at 12:34 PM, Margusja mar...@roo.ee wrote:


Hi

I have kafka broker running (kafka_2.9.1-0.8.1.1)
All is working.

One project requires producer is written in CSharp
I am not dot net programmer but I managed to write simple producer code
using https://github.com/kafka-dev/kafka/blob/master/clients/
csharp/README.md

the code
...
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Kafka.Client;

namespace DemoProducer
{
 class Program
 {
 static void Main(string[] args)
 {
 string payload1 = kafka 1.;
 byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
 Message msg1 = new Message(payloadData1);

 string payload2 = kafka 2.;
 byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
 Message msg2 = new Message(payloadData2);

 Producer producer = new Producer(broker, 9092);
 producer.Send(kafkademo3, 0 ,  msg1 );
 }
 }
}
...

In broker side I am getting the error if I executing the code above:

[2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because
of error (kafka.network.Processor)
java.nio.BufferUnderflowException
 at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
 at java.nio.ByteBuffer.get(ByteBuffer.java:694)
 at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
 at 
kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
 at 
kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
 at 
kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)

 at kafka.network.RequestChannel$Request.init(RequestChannel.
scala:53)
 at kafka.network.Processor.read(SocketServer.scala:353)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:744)



[2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 
because

of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at 
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)

 at kafka.utils.Utils$.read(Utils.scala:375)
 at kafka.network.BoundedByteBufferReceive.readFrom(
BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:347)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:744)

I suspected that the problem is in the broker version
(kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.
Now I was able to send messages using CSharp code.

So is there workaround how I can use latest kafka version and CSharp 
? Or

What is the latest kafka version supporting CSharp producer?

--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)








TCP_TOO_MANY_ESTAB_CONNS

2014-05-13 Thread Ranjith Venkatesan
Dear all,

  We started using kafka as a queue server recently. Our production setup 
has 3 kafka servers and 3 zookeepers.   We having 30 topics with 10 partitions 
each. We have producer pool(10 producers / APP_SERVERS). So totally 30 
producers. With our monitoring tool, we could detect TCP_TOO_MANY_ESTAB_CONNS 
warning message frequently. Why is this warning comes so frequently ?? Are we 
are missing something ??? 




PS : Our monitoring tool's threshold is 600 connections


thanks in advance


Ranjith Venkatesan






What happens to Kafka when ZK lost its quorum?

2014-05-13 Thread Connie Yang
Hi all,

Can Kafka producers, brokers and consumers still be processing messages and
functioning in their normal states if Zookeeper lost its quorum?

Thanks,
Connie


Re: Kafka producer in CSharp

2014-05-13 Thread Joe Stein
The wire protocol has changed drastically since then.

I don't know of any C# clients (there are none on the client library page
nor have I heard of any being used in production but maybe there are some).


For clients that use DotNet I often suggest that they use some HTTP
producer/consumer
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Mon, May 12, 2014 at 12:34 PM, Margusja mar...@roo.ee wrote:

 Hi

 I have kafka broker running (kafka_2.9.1-0.8.1.1)
 All is working.

 One project requires producer is written in CSharp
 I am not dot net programmer but I managed to write simple producer code
 using https://github.com/kafka-dev/kafka/blob/master/clients/
 csharp/README.md

 the code
 ...
 using System;
 using System.Collections.Generic;
 using System.Text;
 using System.Threading.Tasks;
 using Kafka.Client;

 namespace DemoProducer
 {
 class Program
 {
 static void Main(string[] args)
 {
 string payload1 = kafka 1.;
 byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
 Message msg1 = new Message(payloadData1);

 string payload2 = kafka 2.;
 byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
 Message msg2 = new Message(payloadData2);

 Producer producer = new Producer(broker, 9092);
 producer.Send(kafkademo3, 0 ,  msg1 );
 }
 }
 }
 ...

 In broker side I am getting the error if I executing the code above:

 [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because
 of error (kafka.network.Processor)
 java.nio.BufferUnderflowException
 at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
 at java.nio.ByteBuffer.get(ByteBuffer.java:694)
 at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
 at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
 at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
 at kafka.network.RequestChannel$Request.init(RequestChannel.
 scala:53)
 at kafka.network.Processor.read(SocketServer.scala:353)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:744)



 [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because
 of error (kafka.network.Processor)
 java.io.IOException: Connection reset by peer
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at kafka.utils.Utils$.read(Utils.scala:375)
 at kafka.network.BoundedByteBufferReceive.readFrom(
 BoundedByteBufferReceive.scala:54)
 at kafka.network.Processor.read(SocketServer.scala:347)
 at kafka.network.Processor.run(SocketServer.scala:245)
 at java.lang.Thread.run(Thread.java:744)

 I suspected that the problem is in the broker version
 (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.
 Now I was able to send messages using CSharp code.

 So is there workaround how I can use latest kafka version and CSharp ? Or
 What is the latest kafka version supporting CSharp producer?

 --
 Best regards, Margus (Margusja) Roo
 +372 51 48 780
 http://margus.roo.ee
 http://ee.linkedin.com/in/margusroo
 skype: margusja
 ldapsearch -x -h ldap.sk.ee -b c=EE (serialNumber=37303140314)




Re: What happens to Kafka when ZK lost its quorum or becomes unstable?

2014-05-13 Thread Jun Rao
The brokers are designed with the expectation that ZK service is always
available. If ZK is not available, the clients may not be able to
send/fetch data properly.

Thanks,

Jun


On Tue, May 13, 2014 at 6:48 AM, Connie Yang cybercon...@gmail.com wrote:

 Hi,

 Can the producers, brokers and consumers still be processing messages when
 their ZK cluster lost its quorum or becomes unstable?  I know this is
 rather general question as it may depends on what configuration these use.
  So, please enumerate all of those combinations.

 Thanks,
 Connie



Re: Loss of Leader in Kafka

2014-05-13 Thread Neha Narkhede
 3. Deleted the topic. Checked only the zookeeper to see if the
/brokers/topics DOES NOT have the topic

You are seeing this problem since delete topic is not supported in Kafka.
Any attempt to delete a topic may leave your cluster in an unstable state.
We plan to release 0.8.2 with delete topic support or you could try the
latest trunk.


On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar kashya...@gmail.comwrote:

 Hi,
 I am hitting a strange exception while creating a topic in Kafka -
 Steps to generate this-
 1. Created a topic multipartition_test with 2 partitions and 2 replicas
 2. Added some data to this topics and verified data is coming up for both
 partitions
 3. Deleted the topic. Checked only the zookeeper to see if the
 /brokers/topics DOES NOT have the topic
 4. Recreated the topic in exactly the same way as in point 1.

 After this, when I list topics using ./kafka-list-topic.sh, i see that
 *leader:* none and *isr:* for this topic. State change logs give the
 following exception.

 kafka.common.StateChangeFailedException: encountered error while electing
 leader for partition [multipartition_test,1] due to: LeaderAndIsr
 information doesn't exist for partition [multipartition_test,1] in
 OnlinePartition state.
 at

 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327)
 at

 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154)
 at

 kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
 at

 kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109)
 at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
 at

 kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109)
 at

 kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325)
 at

 kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312)
 at

 kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376)
 at

 kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr
 information doesn't exist for partition [multipartition_test,1] in
 OnlinePartition state
 at

 kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347)
 at

 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291)
 ... 11 more

 Can you please help what am I doing wrong?

 Regards,
 kashyap



What happens to Kafka when ZK lost its quorum or becomes unstable?

2014-05-13 Thread Connie Yang
Hi,

Can the producers, brokers and consumers still be processing messages when
their ZK cluster lost its quorum or becomes unstable?  I know this is
rather general question as it may depends on what configuration these use.
 So, please enumerate all of those combinations.

Thanks,
Connie


100 MB messages

2014-05-13 Thread Wouter Bosland
Hello everyone,

Can Kafka be used for binary large objects of 100 MB ?

Or should I use a different solution to store these files like MongoDB and
maybe send the location of these files in MongoDB over Kafka?



Thanks is advance,

Wouter


Re: Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled

2014-05-13 Thread Jun Rao
In 0.8, ZK is critical for managing the brokers. So, we do expect that ZK
service to be always available.

Thanks,

Jun


On Mon, May 12, 2014 at 9:59 AM, Yury Ruchin yuri.ruc...@gmail.com wrote:

 Hi all,

 I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
 Steps to reproduce:

 1. Start single-broker Kafka cluster with auto.create.topic.enable set to
 true
 2. Start ConsumerConnector on topic (which does not yet exist) with
 auto.offset.reset set to smallest.
 3. Produce some data to the topic.
 4. Bring Zookeeper down
 5. Call ConsumerConnector.close()

 Observation - the call blocks forever with the following stack trace:

java.lang.Thread.State: TIMED_WAITING (parking)

 at sun.misc.Unsafe.park(Native Method)

 - parking to wait for  0xc6bf0570 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

 at
 java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

 at
 org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

 at
 org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

 at
 org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

 at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at kafka.utils.Pool.foreach(Unknown Source)

 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
 Source)

 - locked 0xc6be0c60 (a java.lang.Object)

 at
 kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
...

 Once I set auto.commit.enable to false, the problem is gone. This is
 identical to what's described in
 https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
 that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
 issue other than disabling auto-commit?

 Thanks,
 Yury



Re: TCP_TOO_MANY_ESTAB_CONNS

2014-05-13 Thread Guozhang Wang
Hi Ranjith,

Which version of Kafka are you using?

Guozhang


On Tue, May 13, 2014 at 3:53 AM, Ranjith Venkatesan
ranjit...@zohocorp.comwrote:

 Dear all,

   We started using kafka as a queue server recently. Our production
 setup has 3 kafka servers and 3 zookeepers.   We having 30 topics with 10
 partitions each. We have producer pool(10 producers / APP_SERVERS). So
 totally 30 producers. With our monitoring tool, we could detect 
 *TCP_TOO_MANY_ESTAB_CONNS* warning message frequently. Why is this
 warning comes so frequently ?? Are we are missing something ???


 PS : Our monitoring tool's threshold is 600 connections

 thanks in advance

 Ranjith Venkatesan




-- 
-- Guozhang


Re: 100 MB messages

2014-05-13 Thread Joe Stein
Or HDFS and use Kafka for the event of file, yup. Processing on the files can 
be done without the mapreduce overhead in Hadoop now using Apache Tez (or 
something that use Tez like Pig).  


/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
/


 On May 13, 2014, at 4:40 AM, Wouter Bosland wbosland.pr...@gmail.com wrote:
 
 Hello everyone,
 
 Can Kafka be used for binary large objects of 100 MB ?
 
 Or should I use a different solution to store these files like MongoDB and
 maybe send the location of these files in MongoDB over Kafka?
 
 
 
 Thanks is advance,
 
 Wouter


Killing last replica for partition doesn't change ISR/Leadership if replica is running controller

2014-05-13 Thread Alex Demidko
Hi,

Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there is a 
topic with replication 2 and one of it’s partitions - partition 1 is placed on 
brokers A and B. If the broker A is already down than for the partition 1 we 
have: Leader: B, ISR: [B]. If the current controller is node C, than killing 
broker B will turn partition 1 into state: Leader:  -1, ISR: []. But if the 
current controller is node B, than killing it won’t update leadership/isr for 
partition 1 even when controller will be restarted on node C, so partition 1 
will forever think it’s leader is node B which is dead.

It looks that KafkaController.onBrokerFailure handles situation when the broker 
down is the partition leader - it sets the new leader value to -1. To the 
contrary, KafkaController.onControllerFailover never removes leader from the 
partition with all replicas offline - allegedly because partition gets into 
ReplicaDeletionIneligible state. Is it intended behavior?

This behavior affects DefaultEventHandler.getPartition in the null key case - 
it can’t determine partition 1 as having no leader, and this results into 
events send failure.


What we are trying to achieve - is to be able to write data even if some 
partitions lost all replicas, which is rare yet still possible scenario. Using 
null key looked suitable with minor DefaultEventHandler modifications (like 
getting rid from DefaultEventHandler.sendPartitionPerTopicCache to avoid 
caching and uneven events distribution) as we neither use logs compaction nor 
rely on partitioning of the data. We had such behavior with kafka 0.7 - if the 
node is down, simply produce to a different one.


Thanks, 
Alex



Re: Loss of Leader in Kafka

2014-05-13 Thread Guozhang Wang
Hi Kashyap,

This may be a real issue with delete-topic feature, could you also
reproduce this with trunk HEAD and 1 partition/1 replica?

Guozhang


On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar kashya...@gmail.comwrote:

 Hi,
 I am hitting a strange exception while creating a topic in Kafka -
 Steps to generate this-
 1. Created a topic multipartition_test with 2 partitions and 2 replicas
 2. Added some data to this topics and verified data is coming up for both
 partitions
 3. Deleted the topic. Checked only the zookeeper to see if the
 /brokers/topics DOES NOT have the topic
 4. Recreated the topic in exactly the same way as in point 1.

 After this, when I list topics using ./kafka-list-topic.sh, i see that
 *leader:* none and *isr:* for this topic. State change logs give the
 following exception.

 kafka.common.StateChangeFailedException: encountered error while electing
 leader for partition [multipartition_test,1] due to: LeaderAndIsr
 information doesn't exist for partition [multipartition_test,1] in
 OnlinePartition state.
 at

 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327)
 at

 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154)
 at

 kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
 at

 kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109)
 at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
 at

 kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109)
 at

 kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325)
 at

 kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312)
 at

 kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376)
 at

 kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361)
 at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr
 information doesn't exist for partition [multipartition_test,1] in
 OnlinePartition state
 at

 kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347)
 at

 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291)
 ... 11 more

 Can you please help what am I doing wrong?

 Regards,
 kashyap




-- 
-- Guozhang


Re: Kafka 0.7.X maven repo

2014-05-13 Thread Joe Stein
Maven wasn't introduced until 0.8.0.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/


On Tue, May 13, 2014 at 9:52 AM, Vijay Ramachandran vramachand...@apple.com
 wrote:

 We are currently using kafka 0.8.X but want to try something with 0.7.x
 and compare.

 Can anyone point me to the kafka 0.7.X maven repo ?

 Thanks in advance

 Vijay



zookeeper down

2014-05-13 Thread Weide Zhang
Can kafka survive when zookeeper is down and not connectable ? Will the
consumer or producer still work in that case ?

Weide


0.7 maven repo

2014-05-13 Thread Vijay Ramachandran
We are currently using kafka 0.8.X but want to try something with 0.7.x and 
compare.

Can anyone point me to the kafka 0.7.X maven repo ?

Thanks in advance

Vijay


Re: 100 MB messages

2014-05-13 Thread Jay Kreps
It can, but it will not perform very well. Kafka fully instantiates
messages in memory (as a byte[] basically) so if you send a 100MB message
the server will do a 100MB allocation to hold that data prior to writing to
disk.

I think MongoDB does have blob support so passing a pointer via Kafka as
you describe may be a better solution.

-Jay


On Tue, May 13, 2014 at 1:40 AM, Wouter Bosland wbosland.pr...@gmail.comwrote:

 Hello everyone,

 Can Kafka be used for binary large objects of 100 MB ?

 Or should I use a different solution to store these files like MongoDB and
 maybe send the location of these files in MongoDB over Kafka?



 Thanks is advance,

 Wouter



kafka performance question

2014-05-13 Thread Zhujie (zhujie, Smartcare)
Dear all,

We want to use kafka to collect and dispatch data file, but the performance is 
maybe lower than we want.

In our cluster,there is a provider and a broker. We use a one thread read file 
from local disk of provider and send it to broker. The average throughput is 
only 3 MB/S~4MB/S.
But if we just use java NIO API to send file ,the throughput can exceed 200MB/S.
Why the kafka performance is so bad in our test, are we missing something??



Our server:
Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
Mem:300G
Disk:600G 15K RPM SAS*8

Configuration of provider:
props.put(serializer.class, kafka.serializer.NullEncoder);
props.put(metadata.broker.list, 169.10.35.57:9092);
props.put(request.required.acks, 0);
props.put(producer.type, async);//异步
props.put(queue.buffering.max.ms,500);
props.put(queue.buffering.max.messages,10);
props.put(batch.num.messages, 1200);
props.put(queue.enqueue.timeout.ms, -1);
props.put(send.buffer.bytes, 10240);

Configuration of broker:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the License); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Socket Server Settings 
#

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it 
uses the
# value for host.name if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=hostname routable by clients

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=port accessible by clients

# The number of threads handling network requests
#num.network.threads=2
# The number of threads doing disk I/O
#num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
#socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
#socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection 
against OOM)
#socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#num.partitions=2

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data 
to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the 
flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data 
after a period of time or
# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

# Log Retention Policy #

# The following configurations control the disposal of log segments. The policy 
can
# be set to delete segments after a period of time, or after a given size has 
accumulated.
# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
# from the end of the log.

# The minimum age of a 

Re: Loss of Leader in Kafka

2014-05-13 Thread Kashyap Mhaisekar
After topic deletion, I deleted the physical folders in data logs and then
restarted Kafka. That resolved. Thanks!
Will try out 0.8.1 and let the group know.

Regards,
Kashyap

On Monday, May 12, 2014, Jun Rao jun...@gmail.com wrote:

 Delete topic doesn't quite work in 0.8.1. We recently fixed it in trunk.
 Could you give it a try and see if you see the same issue?

 Thanks,

 Jun


 On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar 
 kashya...@gmail.comjavascript:;
 wrote:

  Hi,
  I am hitting a strange exception while creating a topic in Kafka -
  Steps to generate this-
  1. Created a topic multipartition_test with 2 partitions and 2 replicas
  2. Added some data to this topics and verified data is coming up for both
  partitions
  3. Deleted the topic. Checked only the zookeeper to see if the
  /brokers/topics DOES NOT have the topic
  4. Recreated the topic in exactly the same way as in point 1.
 
  After this, when I list topics using ./kafka-list-topic.sh, i see that
  *leader:* none and *isr:* for this topic. State change logs give the
  following exception.
 
  kafka.common.StateChangeFailedException: encountered error while electing
  leader for partition [multipartition_test,1] due to: LeaderAndIsr
  information doesn't exist for partition [multipartition_test,1] in
  OnlinePartition state.
  at
 
 
 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327)
  at
 
 
 kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154)
  at
 
 
 kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
  at
 
 
 kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109)
  at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
  at
 
 
 kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109)
  at
 
 
 kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325)
  at
 
 
 kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312)
  at
 
 
 kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376)
  at
 
 
 kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361)
  at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
  at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
  Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr
  information doesn't exist for partition [multipartition_test,1] in
  OnlinePartition state
  at
 
 
 kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347)
  at
 
 
 kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291)
  ... 11 more
 
  Can you please help what am I doing wrong?
 
  Regards,
  kashyap
 



Re: keyed-messages de-duplication

2014-05-13 Thread Jay Kreps
Hi,

The compaction is done to clean-up space. It isn't done immediately only
periodically.

I suspect the reason you see no compaction is that we never compact the
active segment of the log (the most recent file) as that is still being
written to. The compaction would not happen until a new segment file was
rolled. If you want to see this happen I recommend changing the file
segment size configuration to something small (5mb) and produce enough
messages to roll a new segment file. You should then see logging about
compaction in logs/log-cleaner.log.

-Jay


On Tue, May 13, 2014 at 11:52 AM, C 4.5 cfourf...@gmail.com wrote:

 I understand Kafka supports keyed messages (I am using 0.8.1.1) and it is
 possible to de-duplicate messages based on the message key.

 (The log compaction section of the on-line documentation described how that
 works.)

 I am using a code example that come with Kafka (namely
 KafkaConsumerProducerDemo) and run it through Kafka local mode. I write a
 set of messages with the same String key and then have a consumer that
 consumes data.

 The consumer consumes messages *only* after the producer has produced all
 its messages.

 I would expect the consumer to retrieve only the latest message (as all
 messages have the same key) but it retrieves all messages the producer has
 emitted.

 I have also turned on these properties in the Kafka server:

 log.cleaner.enable=true
 log.cleanup.policy=dedupe

 - is de-duplication of messages guaranteed to take effect only after
 compaction?

 - I have tried to force compaction by setting log.cleaner.backoff.ms
 and log.cleaner.min.cleanabke.ratio to very low values, but I still
 observe the same behavior.

 Any ideas or pointers?

 Thanks.



Re: Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled

2014-05-13 Thread Guozhang Wang
Hi Yury,

When auto-commit is turned on, before the consumer shuts down completely it
needs to commit its last offset to ZK to avoid any duplicates consumed when
it starts over. If the ZK is not up running then the shutdown would rather
stuck than let it go and incur duplicates. This behavior is by design that
to make consistent offset maintenance, ZK should be up all the time.

Guozhang


On Mon, May 12, 2014 at 9:59 AM, Yury Ruchin yuri.ruc...@gmail.com wrote:

 Hi all,

 I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
 Steps to reproduce:

 1. Start single-broker Kafka cluster with auto.create.topic.enable set to
 true
 2. Start ConsumerConnector on topic (which does not yet exist) with
 auto.offset.reset set to smallest.
 3. Produce some data to the topic.
 4. Bring Zookeeper down
 5. Call ConsumerConnector.close()

 Observation - the call blocks forever with the following stack trace:

java.lang.Thread.State: TIMED_WAITING (parking)

 at sun.misc.Unsafe.park(Native Method)

 - parking to wait for  0xc6bf0570 (a
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

 at
 java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

 at

 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

 at
 org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

 at
 org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

 at
 org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

 at
 org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

 at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

 at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
 Source)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
 Source)

 at

 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

 at scala.collection.Iterator$class.foreach(Iterator.scala:727)

 at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

 at
 scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

 at kafka.utils.Pool.foreach(Unknown Source)

 at

 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

 at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
 Source)

 at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
 Source)

 - locked 0xc6be0c60 (a java.lang.Object)

 at
 kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
...

 Once I set auto.commit.enable to false, the problem is gone. This is
 identical to what's described in
 https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
 that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
 issue other than disabling auto-commit?

 Thanks,
 Yury




-- 
-- Guozhang


Re: Killing last replica for partition doesn't change ISR/Leadership if replica is running controller

2014-05-13 Thread Jun Rao
Yes, that seems like a real issue. Could you file a jira?

Thanks,

Jun


On Tue, May 13, 2014 at 11:58 AM, Alex Demidko alexan...@metamarkets.comwrote:

 Hi,

 Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there
 is a topic with replication 2 and one of it’s partitions - partition 1 is
 placed on brokers A and B. If the broker A is already down than for the
 partition 1 we have: Leader: B, ISR: [B]. If the current controller is node
 C, than killing broker B will turn partition 1 into state: Leader:  -1,
 ISR: []. But if the current controller is node B, than killing it won’t
 update leadership/isr for partition 1 even when controller will be
 restarted on node C, so partition 1 will forever think it’s leader is node
 B which is dead.

 It looks that KafkaController.onBrokerFailure handles situation when the
 broker down is the partition leader - it sets the new leader value to -1.
 To the contrary, KafkaController.onControllerFailover never removes leader
 from the partition with all replicas offline - allegedly because partition
 gets into ReplicaDeletionIneligible state. Is it intended behavior?

 This behavior affects DefaultEventHandler.getPartition in the null key
 case - it can’t determine partition 1 as having no leader, and this results
 into events send failure.


 What we are trying to achieve - is to be able to write data even if some
 partitions lost all replicas, which is rare yet still possible scenario.
 Using null key looked suitable with minor DefaultEventHandler modifications
 (like getting rid from DefaultEventHandler.sendPartitionPerTopicCache to
 avoid caching and uneven events distribution) as we neither use logs
 compaction nor rely on partitioning of the data. We had such behavior with
 kafka 0.7 - if the node is down, simply produce to a different one.


 Thanks,
 Alex




Kafka 0.7.X maven repo

2014-05-13 Thread Vijay Ramachandran
We are currently using kafka 0.8.X but want to try something with 0.7.x and 
compare.

Can anyone point me to the kafka 0.7.X maven repo ?

Thanks in advance

Vijay