Re: How to produce and consume events in 2 DCs?

2014-10-22 Thread Erik van Oosten

Hi Steven,

That doesn't work. In your proposal mirrormaker in once DC would copy 
messages from topic A to the other DC in topic A. However, in the other 
DC there is a mirrormaker which does the same, creating a loop. Messages 
will be duplicated, triplicated, etc in a never ending loop.


Mirroring to another topic would work (mirrormaker doesn't support 
that), and so would mirroring to another cluster. Neha's proposal would 
work also but I assume its a lot more work for the Kafka internals and 
therefor IMHO wouldn't meet the kiss principle.


Kind regards,
Erik.


Steven Wu schreef op 22-10-14 om 01:48:

I think it doesn't have to be two more clusters. can be just two more
topics. MirrorMaker can copy from source topics in both regions into one
aggregate topic.

On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten 
e.vanoos...@grons.nl.invalid wrote:


Thanks Neha,

Unfortunately, the maintenance overhead of 2 more clusters is not
acceptable to us.

Would you accept a pull request on mirror maker that would rename topics
on the fly?

For example by accepting the parameter rename:
—rename src1/dest1,src2/dest2
or, extended with RE support:
—rename old_(.*)/new_\1

Kind regards,
 Erik.


Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com
het volgende geschreven:


Another way to set up this kind of mirroring is by deploying 2 clusters

in

each DC - a local Kafka cluster and an aggregate Kafka cluster. The

mirror

maker copies data from both the DC's local clusters into the aggregate
clusters. So if you want access to a topic with data from both DC's, you
subscribe to the aggregate cluster.

Thanks,
Neha

On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
e.vanoos...@grons.nl.invalid wrote:


Hi,

We have 2 data centers that produce events. Each DC has to process

events

from both DCs.

I had the following in mind:

   DC 1 | DC 2
events  |events
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
++ | ++
| Receiver topic | | | Receiver topic |
++   ++
 |  |   mirroring  ||
 |  |   +--+|
 |  |   |   |
 |  ++  |
 v  vv  v
++ | ++
| Consumer topic | | | Consumer topic |
++ | ++
   +  +  +  |   +  +  +
   |  |  |  |   |  |  |
   v  v  v  |   v  v  v
  consumers |  consumers


As each DC has a single Kafka cluster, on each DC the receiver topic and
consumer topic needs to be on the same cluster.
Unfortunately, mirror maker does not seem to support mirroring to a

topic

with another name.

Is there another tool we could use?
Or, is there another approach for producing and consuming from 2 DCs?

Kind regards,
Erik.

—
Erik van Oosten
http://www.day-to-day-stuff.blogspot.nl/







--
Erik van Oosten
http://www.day-to-day-stuff.blogspot.com/



Re: [Kafka-users] Producer not distributing across all partitions

2014-10-22 Thread Mongeol Heo
Hi,

First of all, thank you for replaying.
And I am using 0.8.1.1.
I am expecting the new producer will solve this kind of problem.

Thanks,

Mungeol

On Wed, Oct 22, 2014 at 9:51 AM, Jun Rao jun...@gmail.com wrote:

 Yes, what you did is correct. See details in

 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
 ?

 It seems that it doesn't work all the time. What version of Kafka are you
 using?

 Thanks,

 Jun

 On Mon, Oct 20, 2014 at 9:00 PM, Mungeol Heo mungeol@gmail.com
 wrote:

  Hi,
 
  I have a question about 'topic.metadata.refresh.interval.ms'
  configuration.
  As I know, the default value of it is 10 minutes.
  Does it means that producer will change the partition at every 10
 minutes?
  What I am experiencing is producer does not change to another
  partition at every 10 minutes.
  Sometime, It never changed during the process which costs about 25
 minutes.
  I also changed the value of it to 1 minute for testing.
  It looks like working well at first time.
  However, same problem happens start from second test.
  Sometime, it takes more than 10 minutes to change the partition even
  if I set the value as 1 minute.
  Am i missing something?
  Any help will be great.
 
  Thanks.
 



Re: 0.8.1.2

2014-10-22 Thread Mongeol Heo
Does 0.8.2 includes new producer which mentioned at the documentation of
kafka?
If not, which version will include it?

Thanks,

Mungeol

On Wed, Oct 22, 2014 at 11:21 AM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Shlomi,

 As Jun mentioned, we are voting on a 0.8.2 beta release now. Are you
 suggesting there be an 0.8.1.2 release in addition to that? We can take a
 quick vote from the community to see how many people prefer to have this
 and why.

 Thanks,
 Neha

 On Tue, Oct 21, 2014 at 6:03 PM, Jun Rao jun...@gmail.com wrote:

  We are voting an 0.8.2 beta release right now.
 
  Thanks,
 
  Jun
 
  On Tue, Oct 21, 2014 at 11:17 AM, Shlomi Hazan shl...@viber.com wrote:
 
   Hi All,
   Will version 0.8.1.2 happen?
   Shlomi
  
 



Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
Hello Apache Kafka users,

Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally),
with auto topic creation disabled, in a test I have topic created with
AdminUtils.createTopic (AdminUtils.topicExists returns true) but
KafkaProducer on send request keeps throwing
UnknownTopicOrPartitionException even after 100 retries, both when
topic.metadata.refresh.interval.ms and retry.backoff.ms are left at
defaults, and when customized.

Am I doing something wrong or is this a known bug?

How long does it typically take for metadata to be refreshed?
How long does it take for leader to be elected?

Documentation for retry.backoff.ms states:
Before each retry, the producer refreshes the metadata of relevant topics
to see if a new leader has been elected. Since leader election takes a bit
of time, this property specifies the amount of time that the producer waits
before refreshing the metadata.

Do I understand this docs correctly - on failure to send a message, such as
unknown topic, if retries are configured producer will wait for configured
retry.backoff.ms, then it will initiate and wait for metadata refresh to
complete, and only then retry sending?

Kind regards,
Stevo Slavic.


Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Guozhang Wang
Hello Stevo,

Your understanding about the configs are correct, and it is indeed wired
that the producer gets the exception after topic is created. Could you use
the kafka-topics command to check if the leaders exist?

kafka-topics.sh --zookeeper XXX --topic [topic-name] describe

Guozhang

On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote:

 Hello Apache Kafka users,

 Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running locally),
 with auto topic creation disabled, in a test I have topic created with
 AdminUtils.createTopic (AdminUtils.topicExists returns true) but
 KafkaProducer on send request keeps throwing
 UnknownTopicOrPartitionException even after 100 retries, both when
 topic.metadata.refresh.interval.ms and retry.backoff.ms are left at
 defaults, and when customized.

 Am I doing something wrong or is this a known bug?

 How long does it typically take for metadata to be refreshed?
 How long does it take for leader to be elected?

 Documentation for retry.backoff.ms states:
 Before each retry, the producer refreshes the metadata of relevant topics
 to see if a new leader has been elected. Since leader election takes a bit
 of time, this property specifies the amount of time that the producer waits
 before refreshing the metadata.

 Do I understand this docs correctly - on failure to send a message, such as
 unknown topic, if retries are configured producer will wait for configured
 retry.backoff.ms, then it will initiate and wait for metadata refresh to
 complete, and only then retry sending?

 Kind regards,
 Stevo Slavic.




-- 
-- Guozhang


Re: 0.8.1.2

2014-10-22 Thread Neha Narkhede
Yes, 0.8.2 includes the new producer. 0.8.2 will have a lot of new features
which will take time to stabilize. If people want 0.8.1.2 for some critical
bug fixes, we can discuss the feasibility of doing the release.

On Wed, Oct 22, 2014 at 1:39 AM, Shlomi Hazan shl...@viber.com wrote:

 at the time I thought it was a good idea but if I understand correctly what
 Jun is saying is that 0.8.1.2 will not happen.
 I assume Jun sees 0.8.2 coming soon enough to remove any added value from
 0.8.1.2.
 Shlomi

 On Wed, Oct 22, 2014 at 5:21 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Shlomi,
 
  As Jun mentioned, we are voting on a 0.8.2 beta release now. Are you
  suggesting there be an 0.8.1.2 release in addition to that? We can take a
  quick vote from the community to see how many people prefer to have this
  and why.
 
  Thanks,
  Neha
 
  On Tue, Oct 21, 2014 at 6:03 PM, Jun Rao jun...@gmail.com wrote:
 
   We are voting an 0.8.2 beta release right now.
  
   Thanks,
  
   Jun
  
   On Tue, Oct 21, 2014 at 11:17 AM, Shlomi Hazan shl...@viber.com
 wrote:
  
Hi All,
Will version 0.8.1.2 happen?
Shlomi
   
  
 



Re: frequent periods of ~1500 replicas not in sync

2014-10-22 Thread Neha Narkhede
Neil,

We fixed a bug related to the BadVersion problem in 0.8.1.1. Would you mind
repeating your test on 0.8.1.1 and if you can still reproduce this issue,
then send around the thread dump and attach the logs to KAFKA-1407?

Thanks,
Neha

On Tue, Oct 21, 2014 at 11:56 AM, Neil Harkins nhark...@gmail.com wrote:

 Hi. I've got a 5 node cluster running Kafka 0.8.1,
 with 4697 partitions (2 replicas each) across 564 topics.
 I'm sending it about 1% of our total messaging load now,
 and several times a day there is a period where 1~1500
 partitions have one replica not in sync. Is this normal?
 If a consumer is reading from a replica that gets deemed
 not in sync, does it get redirected to the good replica?
 Is there a #partitions over which maintenance tasks
 become infeasible?

 Relevant config bits:
 auto.leader.rebalance.enable=true
 leader.imbalance.per.broker.percentage=20
 leader.imbalance.check.interval.seconds=30
 replica.lag.time.max.ms=1
 replica.lag.max.messages=4000
 num.replica.fetchers=4
 replica.fetch.max.bytes=10485760

 Not necessarily correlated to those periods,
 I see a lot of these errors in the logs:

 [2014-10-20 21:23:26,999] 21963614 [ReplicaFetcherThread-3-1] ERROR
 kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-3-1], Error
 in fetch Name: FetchRequest; Version: 0; CorrelationId: 77423;
 ClientId: ReplicaFetcherThread-3-1; ReplicaId: 2; MaxWait: 500 ms;
 MinBytes: 1 bytes; RequestInfo: ...

 And a few of these:

 [2014-10-20 21:23:39,555] 3467527 [kafka-scheduler-2] ERROR
 kafka.utils.ZkUtils$  - Conditional update of path
 /brokers/topics/foo.bar/partitions/3/state with data
 {controller_epoch:11,leader:3,version:1,leader_epoch:109,isr:[3]}
 and expected version 197 failed due to
 org.apache.zookeeper.KeeperException$BadVersionException:
 KeeperErrorCode = BadVersion for
 /brokers/topics/foo.bar/partitions/3/state

 And this one I assume is a client closing the connection non-gracefully,
 thus should probably be a warning, not an error?:

 [2014-10-20 21:54:15,599] 23812214 [kafka-processor-9092-3] ERROR
 kafka.network.Processor  - Closing socket for /10.31.0.224 because of
 error

 -neil



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Todd Palino
The number of brokers doesn't really matter here, as far as I can tell,
because the question is about what a single broker can handle. The number
of partitions in the cluster is governed by the ability of the controller
to manage the list of partitions for the cluster, and the ability of each
broker to keep that list (to serve metadata requests). The number of
partitions on a single broker is governed by that broker's ability to
handle the messages and files on disk. That's a much more limiting factor
than what the controller can do.

-Todd

On Tue, Oct 21, 2014 at 2:52 PM, Neil Harkins nhark...@gmail.com wrote:

 On Tue, Oct 21, 2014 at 2:10 PM, Todd Palino tpal...@gmail.com wrote:
  As far as the number of partitions a single broker can handle, we've set
  our cap at 4000 partitions (including replicas). Above that we've seen
 some
  performance and stability issues.

 How many brokers? I'm curious: what kinds of problems would affect
 a single broker with a large number of partitions, but not affect the
 entire cluster with even more partitions?



Errors after reboot on single node setup

2014-10-22 Thread Ciprian Hacman
Hi,

First of all, I am new to Kafka and more of a user than a developer. I will
try to clarify things as much as possible though.

We are using Kafka as a message system for our apps and works nicely in our
SaaS cluster.
I am trying to make the apps also work on a single node for demo purposes.
I set up Zookeeper, Kafka and our apps on a node and things were ok until
rebooting the node. After that I see the following messages in Kafka log:

[2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up
(kafka.controller.KafkaController)
[2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete
(kafka.controller.KafkaController)
[2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data:
{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
stored data:
{jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
(kafka.utils.ZkUtils$)
[2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node
[{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}]
at /brokers/ids/0 a while back in a different session, hence I will backoff
for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
[2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of
/controller changed sent to
kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882]
(org.I0Itec.zkclient.ZkEventThread)
java.lang.IllegalStateException: Kafka scheduler has not been started
at
kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
at
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
at
kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0
with address ip-10-91-142-54.eu-west-1.compute.internal:9092.
(kafka.utils.ZkUtils$)
[2014-10-22 16:37:28,849] INFO [Kafka Server 0], started
(kafka.server.KafkaServer)
[2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1.
(kafka.network.Processor)
[2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1.
(kafka.network.Processor)
[2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1.
(kafka.network.Processor)


The last log line repeats forever and is correlated with errors on the app
side.
Restarting Kafka fixes the errors.

I am using Kafka 0.8.2 from github to avoid
https://issues.apache.org/jira/browse/KAFKA-1451.

Does anyone have any idea why this happens and how it can be fixed?

Thanks,
Ciprian
--
Performance Monitoring * Log Analytics * Search Analytics
Solr  Elasticsearch Support * http://sematext.com/


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Todd Palino
In fact there are many more than 4000 open files. Many of our brokers run
with 28,000+ open files (regular file handles, not network connections). In
our case, we're beefing up the disk performance as much as we can by
running in a RAID-10 configuration with 14 disks.

-Todd

On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote:

 Todd,

 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.

 When all these partitions have write request, how did Kafka make the write
 operation on the disk to be sequential (which is emphasized in the design
 document of Kafka) and make sure the disk access is effective?

 Thank you for your reply.

 xiaobinshe



 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:

  As far as the number of partitions a single broker can handle, we've set
  our cap at 4000 partitions (including replicas). Above that we've seen
 some
  performance and stability issues.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
  wrote:
 
   hello, everyone
  
   I'm new to kafka, I'm wondering what's the max num of partition can one
   siggle machine handle in Kafka?
  
   Is there an sugeest num?
  
   Thanks.
  
   xiaobinshe
  
 



Re: Errors after reboot on single node setup

2014-10-22 Thread Neha Narkhede
Can you provide steps to reproduce this? I'm not sure I understand how you
run into this. It does look like a bug.

On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman ciprian.hac...@sematext.com
 wrote:

 Hi,

 First of all, I am new to Kafka and more of a user than a developer. I will
 try to clarify things as much as possible though.

 We are using Kafka as a message system for our apps and works nicely in our
 SaaS cluster.
 I am trying to make the apps also work on a single node for demo purposes.
 I set up Zookeeper, Kafka and our apps on a node and things were ok until
 rebooting the node. After that I see the following messages in Kafka log:

 [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up
 (kafka.controller.KafkaController)
 [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete
 (kafka.controller.KafkaController)
 [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data:

 {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
 stored data:

 {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
 (kafka.utils.ZkUtils$)
 [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node

 [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}]
 at /brokers/ids/0 a while back in a different session, hence I will backoff
 for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
 [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of
 /controller changed sent to
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882]
 (org.I0Itec.zkclient.ZkEventThread)
 java.lang.IllegalStateException: Kafka scheduler has not been started
 at
 kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
 at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
 at

 kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
 at

 kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
 at kafka.utils.Utils$.inLock(Utils.scala:535)
 at

 kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
 at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
 at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
 [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0
 with address ip-10-91-142-54.eu-west-1.compute.internal:9092.
 (kafka.utils.ZkUtils$)
 [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started
 (kafka.server.KafkaServer)
 [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1.
 (kafka.network.Processor)
 [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1.
 (kafka.network.Processor)
 [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1.
 (kafka.network.Processor)


 The last log line repeats forever and is correlated with errors on the app
 side.
 Restarting Kafka fixes the errors.

 I am using Kafka 0.8.2 from github to avoid
 https://issues.apache.org/jira/browse/KAFKA-1451.

 Does anyone have any idea why this happens and how it can be fixed?

 Thanks,
 Ciprian
 --
 Performance Monitoring * Log Analytics * Search Analytics
 Solr  Elasticsearch Support * http://sematext.com/



Re: Errors after reboot on single node setup

2014-10-22 Thread Harsha
This can reproduced with trunk.

start zookeeper
start kafka-broker
create topic or start a producer writing to a topic
stop zookeeper
stop kafka-broker( kafka broker shutdown goes into  WARN Session
0x14938d9dc010001 for server null, unexpected error, closing socket
connection and attempting reconn
ect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused)
kill -9 kafka-broker
restart zookeeper and than kafka-broker leads into the the error posted
by Ciprian.

Ciprian,
  Can you open a jira for this.

Thanks,
Harsha

On Wed, Oct 22, 2014, at 10:03 AM, Neha Narkhede wrote:
 Can you provide steps to reproduce this? I'm not sure I understand how
 you
 run into this. It does look like a bug.
 
 On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman
 ciprian.hac...@sematext.com
  wrote:
 
  Hi,
 
  First of all, I am new to Kafka and more of a user than a developer. I will
  try to clarify things as much as possible though.
 
  We are using Kafka as a message system for our apps and works nicely in our
  SaaS cluster.
  I am trying to make the apps also work on a single node for demo purposes.
  I set up Zookeeper, Kafka and our apps on a node and things were ok until
  rebooting the node. After that I see the following messages in Kafka log:
 
  [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up
  (kafka.controller.KafkaController)
  [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup complete
  (kafka.controller.KafkaController)
  [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data:
 
  {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
  stored data:
 
  {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
  (kafka.utils.ZkUtils$)
  [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node
 
  [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}]
  at /brokers/ids/0 a while back in a different session, hence I will backoff
  for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
  [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of
  /controller changed sent to
  kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882]
  (org.I0Itec.zkclient.ZkEventThread)
  java.lang.IllegalStateException: Kafka scheduler has not been started
  at
  kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
  at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
  at
 
  kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
  at
 
  kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
  at
 
  kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
  at
 
  kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
  at
 
  kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
  at kafka.utils.Utils$.inLock(Utils.scala:535)
  at
 
  kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
  at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
  at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
  [2014-10-22 16:37:28,757] INFO Registered broker 0 at path /brokers/ids/0
  with address ip-10-91-142-54.eu-west-1.compute.internal:9092.
  (kafka.utils.ZkUtils$)
  [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started
  (kafka.server.KafkaServer)
  [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1.
  (kafka.network.Processor)
  [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1.
  (kafka.network.Processor)
  [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1.
  (kafka.network.Processor)
 
 
  The last log line repeats forever and is correlated with errors on the app
  side.
  Restarting Kafka fixes the errors.
 
  I am using Kafka 0.8.2 from github to avoid
  https://issues.apache.org/jira/browse/KAFKA-1451.
 
  Does anyone have any idea why this happens and how it can be fixed?
 
  Thanks,
  Ciprian
  --
  Performance Monitoring * Log Analytics * Search Analytics
  Solr  Elasticsearch Support * http://sematext.com/
 


Erratic behavior when quickly re-balancing

2014-10-22 Thread Eric Bottard
For the purpose of a unit/integration test for Spring XD, I am creating
several consumers (in the same group) in quick succession. With just 3
consumers, this triggers 2 rebalances that (on some machines) can't be
dealt with in the default 4*2000ms and fails.

I have created a simple use case that reproduces this out of the context of
Spring XD. It can be found in the main() method of [1]. If it does not fail
on your machine, I believe bumping the number of creations to 4 or 5 may do
it..

So, I have a couple of questions:
- on my machine, when using various combinations of rebalance.backoff.ms
and rebalance.max.retries, this failure always seems to happen after 30s,
whatever the combination. On some other (more powerful) machines, it seems
to never fail. Is this actually cpu bound? 30s sounds a lot like twice the
default tick, so is this at all related to an ephemeral node timing out?
- given that this is for the purposes of an integration test, is there any
other parameter that I could tweak to have the system settle down faster?
- Is this something that is likely to change with the 0.9 rewrite (I saw
that the current de-centralized rebalancing mechanics are the cause of
other issues)?

As a workaround, I tried waiting for nodes to re-appear in ZooKeeper (code
at [2]), but this is still very slow (not to mention that this would be
very intrusive to the tests I'm trying to write)

Lastly, I should mention that I do need to create 3 consumers sequentially
(as opposed to say, use a 3 threads consumer). The test in question simply
happens to mimic the creation of consumers that may well be on 3 separate
machines

Best,

[1] https://gist.github.com/ericbottard/91aa9ee114c6091e5b7b
[2] https://gist.github.com/ericbottard/de87f1edc8eee2b9bee5

-- 
Eric Bottard


Re: How to produce and consume events in 2 DCs?

2014-10-22 Thread Steven Wu
Erik, I don't know that mirrormaker can't write to a different topic. but
it might be an useful feature request to mirrormaker.

On Wed, Oct 22, 2014 at 12:21 AM, Erik van Oosten 
e.vanoos...@grons.nl.invalid wrote:

 Hi Steven,

 That doesn't work. In your proposal mirrormaker in once DC would copy
 messages from topic A to the other DC in topic A. However, in the other DC
 there is a mirrormaker which does the same, creating a loop. Messages will
 be duplicated, triplicated, etc in a never ending loop.

 Mirroring to another topic would work (mirrormaker doesn't support that),
 and so would mirroring to another cluster. Neha's proposal would work also
 but I assume its a lot more work for the Kafka internals and therefor IMHO
 wouldn't meet the kiss principle.

 Kind regards,
 Erik.


 Steven Wu schreef op 22-10-14 om 01:48:

  I think it doesn't have to be two more clusters. can be just two more
 topics. MirrorMaker can copy from source topics in both regions into one
 aggregate topic.

 On Tue, Oct 21, 2014 at 1:54 AM, Erik van oosten 
 e.vanoos...@grons.nl.invalid wrote:

  Thanks Neha,

 Unfortunately, the maintenance overhead of 2 more clusters is not
 acceptable to us.

 Would you accept a pull request on mirror maker that would rename topics
 on the fly?

 For example by accepting the parameter rename:
 —rename src1/dest1,src2/dest2
 or, extended with RE support:
 —rename old_(.*)/new_\1

 Kind regards,
  Erik.


 Op 20 okt. 2014, om 16:43 heeft Neha Narkhede neha.narkh...@gmail.com
 het volgende geschreven:

  Another way to set up this kind of mirroring is by deploying 2 clusters

 in

 each DC - a local Kafka cluster and an aggregate Kafka cluster. The

 mirror

 maker copies data from both the DC's local clusters into the aggregate
 clusters. So if you want access to a topic with data from both DC's, you
 subscribe to the aggregate cluster.

 Thanks,
 Neha

 On Mon, Oct 20, 2014 at 7:07 AM, Erik van oosten 
 e.vanoos...@grons.nl.invalid wrote:

  Hi,

 We have 2 data centers that produce events. Each DC has to process

 events

 from both DCs.

 I had the following in mind:

DC 1 | DC 2
 events  |events
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
 ++ | ++
 | Receiver topic | | | Receiver topic |
 ++   ++
  |  |   mirroring  ||
  |  |   +--+|
  |  |   |   |
  |  ++  |
  v  vv  v
 ++ | ++
 | Consumer topic | | | Consumer topic |
 ++ | ++
+  +  +  |   +  +  +
|  |  |  |   |  |  |
v  v  v  |   v  v  v
   consumers |  consumers


 As each DC has a single Kafka cluster, on each DC the receiver topic
 and
 consumer topic needs to be on the same cluster.
 Unfortunately, mirror maker does not seem to support mirroring to a

 topic

 with another name.

 Is there another tool we could use?
 Or, is there another approach for producing and consuming from 2 DCs?

 Kind regards,
 Erik.

 —
 Erik van Oosten
 http://www.day-to-day-stuff.blogspot.nl/





 --
 Erik van Oosten
 http://www.day-to-day-stuff.blogspot.com/




Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Gwen Shapira
RAID-10?
Interesting choice for a system where the data is already replicated
between nodes. Is it to avoid the cost of large replication over the
network? how large are these disks?

On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote:
 In fact there are many more than 4000 open files. Many of our brokers run
 with 28,000+ open files (regular file handles, not network connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.

 -Todd

 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote:

 Todd,

 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.

 When all these partitions have write request, how did Kafka make the write
 operation on the disk to be sequential (which is emphasized in the design
 document of Kafka) and make sure the disk access is effective?

 Thank you for your reply.

 xiaobinshe



 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:

  As far as the number of partitions a single broker can handle, we've set
  our cap at 4000 partitions (including replicas). Above that we've seen
 some
  performance and stability issues.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
  wrote:
 
   hello, everyone
  
   I'm new to kafka, I'm wondering what's the max num of partition can one
   siggle machine handle in Kafka?
  
   Is there an sugeest num?
  
   Thanks.
  
   xiaobinshe
  
 



Re: Errors after reboot on single node setup

2014-10-22 Thread Ciprian Hacman
Thank you for the *very* quick replies Neha, Harsha. I opened a Jira for
this issue:
https://issues.apache.org/jira/browse/KAFKA-1724

Ciprian
--
Performance Monitoring * Log Analytics * Search Analytics
Solr  Elasticsearch Support * http://sematext.com/


On Wed, Oct 22, 2014 at 8:27 PM, Harsha ka...@harsha.io wrote:

 This can reproduced with trunk.

 start zookeeper
 start kafka-broker
 create topic or start a producer writing to a topic
 stop zookeeper
 stop kafka-broker( kafka broker shutdown goes into  WARN Session
 0x14938d9dc010001 for server null, unexpected error, closing socket
 connection and attempting reconn
 ect (org.apache.zookeeper.ClientCnxn)
 java.net.ConnectException: Connection refused)
 kill -9 kafka-broker
 restart zookeeper and than kafka-broker leads into the the error posted
 by Ciprian.

 Ciprian,
   Can you open a jira for this.

 Thanks,
 Harsha

 On Wed, Oct 22, 2014, at 10:03 AM, Neha Narkhede wrote:
  Can you provide steps to reproduce this? I'm not sure I understand how
  you
  run into this. It does look like a bug.
 
  On Wed, Oct 22, 2014 at 9:55 AM, Ciprian Hacman
  ciprian.hac...@sematext.com
   wrote:
 
   Hi,
  
   First of all, I am new to Kafka and more of a user than a developer. I
 will
   try to clarify things as much as possible though.
  
   We are using Kafka as a message system for our apps and works nicely
 in our
   SaaS cluster.
   I am trying to make the apps also work on a single node for demo
 purposes.
   I set up Zookeeper, Kafka and our apps on a node and things were ok
 until
   rebooting the node. After that I see the following messages in Kafka
 log:
  
   [2014-10-22 16:37:22,206] INFO [Controller 0]: Controller starting up
   (kafka.controller.KafkaController)
   [2014-10-22 16:37:22,419] INFO [Controller 0]: Controller startup
 complete
   (kafka.controller.KafkaController)
   [2014-10-22 16:37:22,554] INFO conflict in /brokers/ids/0 data:
  
  
 {jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
   stored data:
  
  
 {jmx_port:-1,timestamp:1413994171579,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}
   (kafka.utils.ZkUtils$)
   [2014-10-22 16:37:22,736] INFO I wrote this conflicted ephemeral node
  
  
 [{jmx_port:-1,timestamp:1413995842465,host:ip-10-91-142-54.eu-west-1.compute.internal,version:1,port:9092}]
   at /brokers/ids/0 a while back in a different session, hence I will
 backoff
   for this node to be deleted by Zookeeper and retry
 (kafka.utils.ZkUtils$)
   [2014-10-22 16:37:25,010] ERROR Error handling event ZkEvent[Data of
   /controller changed sent to
   kafka.server.ZookeeperLeaderElector$LeaderChangeListener@a6af882]
   (org.I0Itec.zkclient.ZkEventThread)
   java.lang.IllegalStateException: Kafka scheduler has not been started
   at
   kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
   at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
   at
  
  
 kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
   at
  
  
 kafka.controller.KafkaController$$anonfun$2.apply$mcV$sp(KafkaController.scala:162)
   at
  
  
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:138)
   at
  
  
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
   at
  
  
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:134)
   at kafka.utils.Utils$.inLock(Utils.scala:535)
   at
  
  
 kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:134)
   at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:549)
   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
   [2014-10-22 16:37:28,757] INFO Registered broker 0 at path
 /brokers/ids/0
   with address ip-10-91-142-54.eu-west-1.compute.internal:9092.
   (kafka.utils.ZkUtils$)
   [2014-10-22 16:37:28,849] INFO [Kafka Server 0], started
   (kafka.server.KafkaServer)
   [2014-10-22 16:38:56,718] INFO Closing socket connection to /127.0.0.1
 .
   (kafka.network.Processor)
   [2014-10-22 16:38:56,850] INFO Closing socket connection to /127.0.0.1
 .
   (kafka.network.Processor)
   [2014-10-22 16:38:56,985] INFO Closing socket connection to /127.0.0.1
 .
   (kafka.network.Processor)
  
  
   The last log line repeats forever and is correlated with errors on the
 app
   side.
   Restarting Kafka fixes the errors.
  
   I am using Kafka 0.8.2 from github to avoid
   https://issues.apache.org/jira/browse/KAFKA-1451.
  
   Does anyone have any idea why this happens and how it can be fixed?
  
   Thanks,
   Ciprian
   --
   Performance Monitoring * Log Analytics * Search Analytics
   Solr  Elasticsearch Support * 

Re: Sizing Cluster

2014-10-22 Thread Pete Wright


On 10/21/14 21:13, István wrote:
 Hi Pete,
 
 Yes you are right, both nodes has all of the data. I was just wondering
 what is the scenario for losing one node, in production it might not fly.
 If this is for testing only, you are good.
 
 Answering your question, I think retention policy (log.retention.hours) is
 for controlling the disk utilization. I think disk IO (log.flush.* section)
 and network IO (num.network.threads, etc.) saturation you might want to
 measure during tests and spec it based on that. Here is a link with
 examples for the full list of relevant settings, with more description:
 https://kafka.apache.org/08/ops.html.
 
  I guess the most important question is, how many clients do you want to
 support. You could work out how much space you need based on that, assuming
 few things. For more complete documentation refer to:
 https://kafka.apache.org/08/configuration.html

Thanks Istvan - this is helpful.

Cheers,
-pete

-- 
Pete Wright
Systems Architect
Rubicon Project
pwri...@rubiconproject.com
310.309.9298


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
There are various costs when a broker fails, including broker leader election 
for each partition, etc., as well as exposing possible issues for in-flight 
messages, and client rebalancing etc.

So even though replication provides partition redundancy, RAID 10 on each 
broker is usually a good tradeoff to prevent the typical most common cause of 
broker server failure (e.g. disk failure) as well, and overall smoother 
operation.

Best Regards,

-Jonathan


On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote:

 RAID-10?
 Interesting choice for a system where the data is already replicated
 between nodes. Is it to avoid the cost of large replication over the
 network? how large are these disks?
 
 On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote:
 In fact there are many more than 4000 open files. Many of our brokers run
 with 28,000+ open files (regular file handles, not network connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote:
 
 Todd,
 
 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.
 
 When all these partitions have write request, how did Kafka make the write
 operation on the disk to be sequential (which is emphasized in the design
 document of Kafka) and make sure the disk access is effective?
 
 Thank you for your reply.
 
 xiaobinshe
 
 
 
 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
 As far as the number of partitions a single broker can handle, we've set
 our cap at 4000 partitions (including replicas). Above that we've seen
 some
 performance and stability issues.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
 hello, everyone
 
 I'm new to kafka, I'm wondering what's the max num of partition can one
 siggle machine handle in Kafka?
 
 Is there an sugeest num?
 
 Thanks.
 
 xiaobinshe
 
 
 



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Gwen Shapira
Makes sense. Thanks :)

On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
jonathanbwe...@gmail.com wrote:
 There are various costs when a broker fails, including broker leader election 
 for each partition, etc., as well as exposing possible issues for in-flight 
 messages, and client rebalancing etc.

 So even though replication provides partition redundancy, RAID 10 on each 
 broker is usually a good tradeoff to prevent the typical most common cause of 
 broker server failure (e.g. disk failure) as well, and overall smoother 
 operation.

 Best Regards,

 -Jonathan


 On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote:

 RAID-10?
 Interesting choice for a system where the data is already replicated
 between nodes. Is it to avoid the cost of large replication over the
 network? how large are these disks?

 On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote:
 In fact there are many more than 4000 open files. Many of our brokers run
 with 28,000+ open files (regular file handles, not network connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.

 -Todd

 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote:

 Todd,

 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.

 When all these partitions have write request, how did Kafka make the write
 operation on the disk to be sequential (which is emphasized in the design
 document of Kafka) and make sure the disk access is effective?

 Thank you for your reply.

 xiaobinshe



 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:

 As far as the number of partitions a single broker can handle, we've set
 our cap at 4000 partitions (including replicas). Above that we've seen
 some
 performance and stability issues.

 -Todd

 On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
 wrote:

 hello, everyone

 I'm new to kafka, I'm wondering what's the max num of partition can one
 siggle machine handle in Kafka?

 Is there an sugeest num?

 Thanks.

 xiaobinshe






Re: Performance issues

2014-10-22 Thread Mohit Anchlia
I can't find this property in server.properties file. Is that the right
place to set this parameter?
On Tue, Oct 21, 2014 at 6:27 PM, Jun Rao jun...@gmail.com wrote:

 Could you also set replica.fetch.wait.max.ms in the broker to sth much
 smaller?

 Thanks,

 Jun

 On Tue, Oct 21, 2014 at 2:15 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

  I set the property to 1 in the consumer code that is passed to
  createJavaConsumerConnector
  code, but it didn't seem to help
 
  props.put(fetch.wait.max.ms, fetchMaxWait);
 
  On Tue, Oct 21, 2014 at 1:21 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   This is a consumer config:
  
   fetch.wait.max.ms
  
   On Tue, Oct 21, 2014 at 11:39 AM, Mohit Anchlia 
 mohitanch...@gmail.com
   wrote:
  
Is this a parameter I need to set it in kafka server or on the client
   side?
Also, can you help point out which one exactly is consumer max wait
  time
from this list?
   
https://kafka.apache.org/08/configuration.html
   
On Tue, Oct 21, 2014 at 11:35 AM, Jay Kreps jay.kr...@gmail.com
  wrote:
   
 There was a bug that could lead to the fetch request from the
  consumer
 hitting it's timeout instead of being immediately triggered by the
produce
 request. To see if you are effected by that set you consumer max
 wait
time
 to 1 ms and see if the latency drops to 1 ms (or, alternately, try
  with
 trunk and see if that fixes the problem).

 The reason I suspect this problem is because the default timeout in
  the
 java consumer is 100ms.

 -Jay

 On Tue, Oct 21, 2014 at 11:06 AM, Mohit Anchlia 
   mohitanch...@gmail.com
 wrote:

  This is the version I am using: kafka_2.10-0.8.1.1
 
  I think this is fairly recent version
  On Tue, Oct 21, 2014 at 10:57 AM, Jay Kreps jay.kr...@gmail.com
 
wrote:
 
   What version of Kafka is this? Can you try the same test
 against
trunk?
  We
   fixed a couple of latency related bugs which may be the cause.
  
   -Jay
  
   On Tue, Oct 21, 2014 at 10:50 AM, Mohit Anchlia 
 mohitanch...@gmail.com
   wrote:
  
It's consistently close to 100ms which makes me believe that
   there
 are
   some
settings that I might have to tweak, however, I am not sure
 how
   to
   confirm
that assumption :)
On Tue, Oct 21, 2014 at 8:53 AM, Mohit Anchlia 
 mohitanch...@gmail.com
  
wrote:
   
 I have a java test that produces messages and then consumer
 consumers
   it.
 Consumers are active all the time. There is 1 consumer for
 1
  producer.
   I
am
 measuring the time between the message is successfully
  written
   to
 the
queue
 and the time consumer picks it up.

 On Tue, Oct 21, 2014 at 8:32 AM, Neha Narkhede 
   neha.narkh...@gmail.com
 wrote:

 Can you give more information about the performance test?
   Which
  test?
 Which
 queue? How did you measure the dequeue latency.

 On Mon, Oct 20, 2014 at 5:09 PM, Mohit Anchlia 
   mohitanch...@gmail.com
 wrote:

  I am running a performance test and from what I am
 seeing
  is
 that
 messages
  are taking about 100ms to pop from the queue itself and
   hence
  making
the
  test slow. I am looking for pointers of how I can
   troubleshoot
  this
 issue.
 
  There seems to be plenty of CPU and IO available. I am
   running
 22
 producers
  and 22 consumers in the same group.
 



   
  
 

   
  
  
  
   --
   -- Guozhang
  
 



Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
kafka-topics.sh execution, from latest trunk:

~/git/oss/kafka [trunk|✔]
21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
059915e6-56ef-4b8e-8e95-9f676313a01c --describe
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Error while executing topic command next on empty iterator
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)


Output from same command on 0.8.1 branch is better, but still same
exception:

~/git/oss/kafka [0.8.1|✔]
21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
059915e6-56ef-4b8e-8e95-9f676313a01c --describe
Error while executing topic command null
java.util.NoSuchElementException
at scala.collection.IterableLike$class.head(IterableLike.scala:101)
at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Stevo,

 Your understanding about the configs are correct, and it is indeed wired
 that the producer gets the exception after topic is created. Could you use
 the kafka-topics command to check if the leaders exist?

 kafka-topics.sh --zookeeper XXX --topic [topic-name] describe

 Guozhang

 On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote:

  Hello Apache Kafka users,
 
  Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running
 locally),
  with auto topic creation disabled, in a test I have topic created with
  AdminUtils.createTopic (AdminUtils.topicExists returns true) but
  KafkaProducer on send request keeps throwing
  UnknownTopicOrPartitionException even after 100 retries, both when
  topic.metadata.refresh.interval.ms and retry.backoff.ms are left at
  defaults, and when customized.
 
  Am I doing something wrong or is this a known bug?
 
  How long does it typically take for metadata to be refreshed?
  How long does it take for leader to be elected?
 
  Documentation for retry.backoff.ms states:
  Before each retry, the producer refreshes the metadata of relevant
 topics
  to see if a new leader has been elected. Since leader election takes a
 bit
  of time, this property specifies the amount of time that the producer
 waits
  before refreshing the metadata.
 
  Do I understand this docs correctly - on failure to send a message, such
 as
  unknown topic, if retries are configured producer will wait for
 configured
  retry.backoff.ms, then it will initiate and wait for metadata refresh to
  complete, and only then retry sending?
 
  Kind regards,
  Stevo Slavic.
 



 --
 -- Guozhang



Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
Output on trunk is clean too, after clean build:

~/git/oss/kafka [trunk|✔]
22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
059915e6-56ef-4b8e-8e95-9f676313a01c --describe
Error while executing topic command next on empty iterator
java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IterableLike$class.head(IterableLike.scala:91)
at scala.collection.AbstractIterable.head(Iterable.scala:54)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
at
kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić ssla...@gmail.com wrote:

 kafka-topics.sh execution, from latest trunk:

 ~/git/oss/kafka [trunk|✔]
 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 Error while executing topic command next on empty iterator
 java.util.NoSuchElementException: next on empty iterator
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
 at scala.collection.IterableLike$class.head(IterableLike.scala:91)
 at scala.collection.AbstractIterable.head(Iterable.scala:54)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)


 Output from same command on 0.8.1 branch is better, but still same
 exception:

 ~/git/oss/kafka [0.8.1|✔]
 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
 Error while executing topic command null
 java.util.NoSuchElementException
 at scala.collection.IterableLike$class.head(IterableLike.scala:101)
 at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
 at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)

 On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Stevo,

 Your understanding about the configs are correct, and it is indeed wired
 that the producer gets the exception after topic is created. Could you use
 the kafka-topics command to check if the leaders exist?

 kafka-topics.sh --zookeeper XXX --topic [topic-name] describe

 Guozhang

 On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote:

  Hello Apache Kafka users,
 
  Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running
 locally),
  with auto topic creation disabled, in a test I have topic created with
  AdminUtils.createTopic (AdminUtils.topicExists returns true) but
  KafkaProducer on send request keeps throwing
  UnknownTopicOrPartitionException even after 100 retries, both when
  topic.metadata.refresh.interval.ms and retry.backoff.ms are left at
  defaults, and when customized.
 
  Am I doing something wrong or is this a known bug?
 
  How long does it typically take for metadata to be refreshed?
  How long does it take for leader to be elected?
 
  Documentation for retry.backoff.ms states:
  Before each retry, the producer refreshes the metadata of relevant
 topics
  to 

Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Neha Narkhede
In my experience, RAID 10 doesn't really provide value in the presence of
replication. When a disk fails, the RAID resync process is so I/O intensive
that it renders the broker useless until it completes. When this happens,
you actually have to take the broker out of rotation and move the leaders
off of it to prevent it from serving requests in a degraded state. You
might as well shutdown the broker, delete the broker's data and let it
catch up from the leader.

On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Makes sense. Thanks :)

 On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
  There are various costs when a broker fails, including broker leader
 election for each partition, etc., as well as exposing possible issues for
 in-flight messages, and client rebalancing etc.
 
  So even though replication provides partition redundancy, RAID 10 on
 each broker is usually a good tradeoff to prevent the typical most common
 cause of broker server failure (e.g. disk failure) as well, and overall
 smoother operation.
 
  Best Regards,
 
  -Jonathan
 
 
  On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  RAID-10?
  Interesting choice for a system where the data is already replicated
  between nodes. Is it to avoid the cost of large replication over the
  network? how large are these disks?
 
  On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com
 wrote:
  In fact there are many more than 4000 open files. Many of our brokers
 run
  with 28,000+ open files (regular file handles, not network
 connections). In
  our case, we're beefing up the disk performance as much as we can by
  running in a RAID-10 configuration with 14 disks.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
  Todd,
 
  Actually I'm wondering how kafka handle so much partition, with one
  partition there is at least one file on disk, and with 4000 partition,
  there will be at least 4000 files.
 
  When all these partitions have write request, how did Kafka make the
 write
  operation on the disk to be sequential (which is emphasized in the
 design
  document of Kafka) and make sure the disk access is effective?
 
  Thank you for your reply.
 
  xiaobinshe
 
 
 
  2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
  As far as the number of partitions a single broker can handle, we've
 set
  our cap at 4000 partitions (including replicas). Above that we've
 seen
  some
  performance and stability issues.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
  wrote:
 
  hello, everyone
 
  I'm new to kafka, I'm wondering what's the max num of partition can
 one
  siggle machine handle in Kafka?
 
  Is there an sugeest num?
 
  Thanks.
 
  xiaobinshe
 
 
 
 



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
Neha, 

Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is definitely 
very painful, but less so with RAID 10.

We have been using the guidance here:

http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site Reliability 
Engineers state they run RAID 10 on all Kafka clusters @34:40 or so)

Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations

LinkedIn
Hardware
We are using dual quad-core Intel Xeon machines with 24GB of memory. In general 
this should not matter too much, we only see pretty low CPU usage at peak even 
with GZIP compression enabled and a number of clients that don't batch 
requests. The memory is probably more than is needed for caching the active 
segments of the log.
The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID 10 
array. In general this is the performance bottleneck, and more disks is more 
better. Depending on how you configure flush behavior you may or may not 
benefit from more expensive disks (if you flush often then higher RPM SAS 
drives may be better).
OS Settings
We use Linux. Ext4 is the filesystem and we run using software RAID 10. We 
haven't benchmarked filesystems so other filesystems may be superior.
We have added two tuning changes: (1) we upped the number of file descriptors 
since we have lots of topics and lots of connections, and (2) we upped the max 
socket buffer size to enable high-performance data transfer between data 
centers (described here).


Best Regards,

-Jonathan



On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

 In my experience, RAID 10 doesn't really provide value in the presence of
 replication. When a disk fails, the RAID resync process is so I/O intensive
 that it renders the broker useless until it completes. When this happens,
 you actually have to take the broker out of rotation and move the leaders
 off of it to prevent it from serving requests in a degraded state. You
 might as well shutdown the broker, delete the broker's data and let it
 catch up from the leader.
 
 On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
 Makes sense. Thanks :)
 
 On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
 There are various costs when a broker fails, including broker leader
 election for each partition, etc., as well as exposing possible issues for
 in-flight messages, and client rebalancing etc.
 
 So even though replication provides partition redundancy, RAID 10 on
 each broker is usually a good tradeoff to prevent the typical most common
 cause of broker server failure (e.g. disk failure) as well, and overall
 smoother operation.
 
 Best Regards,
 
 -Jonathan
 
 
 On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
 RAID-10?
 Interesting choice for a system where the data is already replicated
 between nodes. Is it to avoid the cost of large replication over the
 network? how large are these disks?
 
 On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com
 wrote:
 In fact there are many more than 4000 open files. Many of our brokers
 run
 with 28,000+ open files (regular file handles, not network
 connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
 Todd,
 
 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.
 
 When all these partitions have write request, how did Kafka make the
 write
 operation on the disk to be sequential (which is emphasized in the
 design
 document of Kafka) and make sure the disk access is effective?
 
 Thank you for your reply.
 
 xiaobinshe
 
 
 
 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
 As far as the number of partitions a single broker can handle, we've
 set
 our cap at 4000 partitions (including replicas). Above that we've
 seen
 some
 performance and stability issues.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
 hello, everyone
 
 I'm new to kafka, I'm wondering what's the max num of partition can
 one
 siggle machine handle in Kafka?
 
 Is there an sugeest num?
 
 Thanks.
 
 xiaobinshe
 
 
 
 
 



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Todd Palino
Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until
recently, sat 8 feet from my desk. The data from the wiki page is off a
little bit as well (we're running 14 disks now, and 64 GB systems)

So to hit the first questions, RAID 10 gives higher read performance, and
also allows you to suffer a disk failure without having to drop the entire
cluster. As Neha noted, you're going to take a hit on the rebuild, and
because of ongoing traffic in the cluster it will be for a long time (we
can easily take half a day to rebuild a disk). But you still get some
benefit out of the RAID over just killing the data and letting it rebuild
from the replica, because during that time the cluster is not under
replicated, so you can suffer another failure. The more servers and disks
you have, the more often disks are going to fail, not to mention other
components. Both hardware and software. I like running on the safer side.

That said, I'm not sure RAID 10 is the answer either. We're going to be
doing some experimenting with other disk layouts shortly. We've inherited a
lot of our architecture, and many things have changed in that time. We're
probably going to test out RAID 5 and 6 to start with and see how much we
lose from the parity calculations.

-Todd


On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks jonathanbwe...@gmail.com
wrote:

 Neha,

 Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is
 definitely very painful, but less so with RAID 10.

 We have been using the guidance here:

 http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site
 Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40
 or so)

 Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations

 LinkedIn
 Hardware
 We are using dual quad-core Intel Xeon machines with 24GB of memory. In
 general this should not matter too much, we only see pretty low CPU usage
 at peak even with GZIP compression enabled and a number of clients that
 don't batch requests. The memory is probably more than is needed for
 caching the active segments of the log.
 The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID
 10 array. In general this is the performance bottleneck, and more disks is
 more better. Depending on how you configure flush behavior you may or may
 not benefit from more expensive disks (if you flush often then higher RPM
 SAS drives may be better).
 OS Settings
 We use Linux. Ext4 is the filesystem and we run using software RAID 10. We
 haven't benchmarked filesystems so other filesystems may be superior.
 We have added two tuning changes: (1) we upped the number of file
 descriptors since we have lots of topics and lots of connections, and (2)
 we upped the max socket buffer size to enable high-performance data
 transfer between data centers (described here).


 Best Regards,

 -Jonathan



 On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  In my experience, RAID 10 doesn't really provide value in the presence of
  replication. When a disk fails, the RAID resync process is so I/O
 intensive
  that it renders the broker useless until it completes. When this happens,
  you actually have to take the broker out of rotation and move the leaders
  off of it to prevent it from serving requests in a degraded state. You
  might as well shutdown the broker, delete the broker's data and let it
  catch up from the leader.
 
  On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  Makes sense. Thanks :)
 
  On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
  jonathanbwe...@gmail.com wrote:
  There are various costs when a broker fails, including broker leader
  election for each partition, etc., as well as exposing possible issues
 for
  in-flight messages, and client rebalancing etc.
 
  So even though replication provides partition redundancy, RAID 10 on
  each broker is usually a good tradeoff to prevent the typical most
 common
  cause of broker server failure (e.g. disk failure) as well, and overall
  smoother operation.
 
  Best Regards,
 
  -Jonathan
 
 
  On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  RAID-10?
  Interesting choice for a system where the data is already replicated
  between nodes. Is it to avoid the cost of large replication over the
  network? how large are these disks?
 
  On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com
  wrote:
  In fact there are many more than 4000 open files. Many of our brokers
  run
  with 28,000+ open files (regular file handles, not network
  connections). In
  our case, we're beefing up the disk performance as much as we can by
  running in a RAID-10 configuration with 14 disks.
 
  -Todd
 
  On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com
  wrote:
 
  Todd,
 
  Actually I'm wondering how kafka handle so much partition, with one
  partition there is at least one file on disk, and with 4000
 partition,
  there will 

Re: Explicit topic creation and topic metadata availability

2014-10-22 Thread Stevo Slavić
Still have to understand what is going on, but when I set
kafka.utils.ZKStringSerializer to be ZkSerializer for ZkClient used in
AdminUtils calls, KafkaProducer could see created topic...
Default ZkSerializer is
org.I0Itec.zkclient.serialize.SerializableSerializer.

Kind regards,
Stevo Slavic.

On Wed, Oct 22, 2014 at 10:03 PM, Stevo Slavić ssla...@gmail.com wrote:

 Output on trunk is clean too, after clean build:

 ~/git/oss/kafka [trunk|✔]
 22:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
 Error while executing topic command next on empty iterator
 java.util.NoSuchElementException: next on empty iterator
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
 at scala.collection.IterableLike$class.head(IterableLike.scala:91)
 at scala.collection.AbstractIterable.head(Iterable.scala:54)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)

 On Wed, Oct 22, 2014 at 9:45 PM, Stevo Slavić ssla...@gmail.com wrote:

 kafka-topics.sh execution, from latest trunk:

 ~/git/oss/kafka [trunk|✔]
 21:00 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in
 [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in
 [jar:file:/Users/d062007/git/oss/kafka/core/build/dependant-libs-2.10.1/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 Error while executing topic command next on empty iterator
 java.util.NoSuchElementException: next on empty iterator
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
 at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
 at scala.collection.IterableLike$class.head(IterableLike.scala:91)
 at scala.collection.AbstractIterable.head(Iterable.scala:54)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:170)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:160)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:160)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)


 Output from same command on 0.8.1 branch is better, but still same
 exception:

 ~/git/oss/kafka [0.8.1|✔]
 21:12 $ bin/kafka-topics.sh --zookeeper 127.0.0.1:50194 --topic
 059915e6-56ef-4b8e-8e95-9f676313a01c --describe
 Error while executing topic command null
 java.util.NoSuchElementException
 at scala.collection.IterableLike$class.head(IterableLike.scala:101)
 at scala.collection.immutable.Map$EmptyMap$.head(Map.scala:73)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:137)
 at
 kafka.admin.TopicCommand$$anonfun$describeTopic$1.apply(TopicCommand.scala:127)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
 at kafka.admin.TopicCommand$.describeTopic(TopicCommand.scala:127)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:56)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)

 On Wed, Oct 22, 2014 at 5:30 PM, Guozhang Wang wangg...@gmail.com
 wrote:

 Hello Stevo,

 Your understanding about the configs are correct, and it is indeed wired
 that the producer gets the exception after topic is created. Could you
 use
 the kafka-topics command to check if the leaders exist?

 kafka-topics.sh --zookeeper XXX --topic [topic-name] describe

 Guozhang

 On Wed, Oct 22, 2014 at 5:57 AM, Stevo Slavić ssla...@gmail.com wrote:

  Hello Apache Kafka users,
 
  Using Kafka 0.8.1.1 (single instance with single ZK 3.4.6 running
 locally),
  with auto topic creation disabled, in a test I have topic created with
  AdminUtils.createTopic (AdminUtils.topicExists returns true) but
  KafkaProducer on send request keeps throwing
  UnknownTopicOrPartitionException even after 100 retries, both when
  

Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
I suppose it also is going to depend on:

a) How much spare I/O bandwidth the brokers have as well to support a rebuild 
while supporting ongoing requests. Our brokers have spare IO capacity.
b) How many brokers are in the cluster and what the replication factor is — 
e.g. if you have a larger cluster, it is easier to tolerate the loss of a 
single broker. We started with 3 brokers, so the loss of a single broker is 
quite significant — we would prefer possibly degraded performance to having a 
“down” broker.

I do understand that y’all both work at LinkedIn, my point is that all of the 
guidance to date (as recently as this summer) is that in production LinkedIn 
runs on RAID 10, so it is just a bit odd to hear a contrary recommendation, 
although I do understand that best practices are a moving, evolving target.

Best Regards,

-Jonathan


On Oct 22, 2014, at 4:05 PM, Todd Palino tpal...@gmail.com wrote:

 Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until
 recently, sat 8 feet from my desk. The data from the wiki page is off a
 little bit as well (we're running 14 disks now, and 64 GB systems)
 
 So to hit the first questions, RAID 10 gives higher read performance, and
 also allows you to suffer a disk failure without having to drop the entire
 cluster. As Neha noted, you're going to take a hit on the rebuild, and
 because of ongoing traffic in the cluster it will be for a long time (we
 can easily take half a day to rebuild a disk). But you still get some
 benefit out of the RAID over just killing the data and letting it rebuild
 from the replica, because during that time the cluster is not under
 replicated, so you can suffer another failure. The more servers and disks
 you have, the more often disks are going to fail, not to mention other
 components. Both hardware and software. I like running on the safer side.
 
 That said, I'm not sure RAID 10 is the answer either. We're going to be
 doing some experimenting with other disk layouts shortly. We've inherited a
 lot of our architecture, and many things have changed in that time. We're
 probably going to test out RAID 5 and 6 to start with and see how much we
 lose from the parity calculations.
 
 -Todd
 
 
 On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks jonathanbwe...@gmail.com
 wrote:
 
 Neha,
 
 Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is
 definitely very painful, but less so with RAID 10.
 
 We have been using the guidance here:
 
 http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site
 Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40
 or so)
 
 Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations
 
 LinkedIn
 Hardware
 We are using dual quad-core Intel Xeon machines with 24GB of memory. In
 general this should not matter too much, we only see pretty low CPU usage
 at peak even with GZIP compression enabled and a number of clients that
 don't batch requests. The memory is probably more than is needed for
 caching the active segments of the log.
 The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID
 10 array. In general this is the performance bottleneck, and more disks is
 more better. Depending on how you configure flush behavior you may or may
 not benefit from more expensive disks (if you flush often then higher RPM
 SAS drives may be better).
 OS Settings
 We use Linux. Ext4 is the filesystem and we run using software RAID 10. We
 haven't benchmarked filesystems so other filesystems may be superior.
 We have added two tuning changes: (1) we upped the number of file
 descriptors since we have lots of topics and lots of connections, and (2)
 we upped the max socket buffer size to enable high-performance data
 transfer between data centers (described here).
 
 
 Best Regards,
 
 -Jonathan
 
 
 
 On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
 
 In my experience, RAID 10 doesn't really provide value in the presence of
 replication. When a disk fails, the RAID resync process is so I/O
 intensive
 that it renders the broker useless until it completes. When this happens,
 you actually have to take the broker out of rotation and move the leaders
 off of it to prevent it from serving requests in a degraded state. You
 might as well shutdown the broker, delete the broker's data and let it
 catch up from the leader.
 
 On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
 Makes sense. Thanks :)
 
 On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
 There are various costs when a broker fails, including broker leader
 election for each partition, etc., as well as exposing possible issues
 for
 in-flight messages, and client rebalancing etc.
 
 So even though replication provides partition redundancy, RAID 10 on
 each broker is usually a good tradeoff to prevent the typical most
 common
 cause of broker server failure (e.g. disk failure) as well, and overall
 

Re: Erratic behavior when quickly re-balancing

2014-10-22 Thread Guozhang Wang
Hello Eric,

1) The rebalance failures is mainly on ZK session timeout, you could try to
increase your zk session timeout value and see if that helps.

2) The new consumer in 0.9 re-write will resolve this problem by getting
rid of the ZK dependency and use a centralized coordinator for rebalance
logic.

Guozhang

On Wed, Oct 22, 2014 at 9:32 AM, Eric Bottard ebott...@pivotal.io wrote:

 For the purpose of a unit/integration test for Spring XD, I am creating
 several consumers (in the same group) in quick succession. With just 3
 consumers, this triggers 2 rebalances that (on some machines) can't be
 dealt with in the default 4*2000ms and fails.

 I have created a simple use case that reproduces this out of the context of
 Spring XD. It can be found in the main() method of [1]. If it does not fail
 on your machine, I believe bumping the number of creations to 4 or 5 may do
 it..

 So, I have a couple of questions:
 - on my machine, when using various combinations of rebalance.backoff.ms
 and rebalance.max.retries, this failure always seems to happen after 30s,
 whatever the combination. On some other (more powerful) machines, it seems
 to never fail. Is this actually cpu bound? 30s sounds a lot like twice the
 default tick, so is this at all related to an ephemeral node timing out?
 - given that this is for the purposes of an integration test, is there any
 other parameter that I could tweak to have the system settle down faster?
 - Is this something that is likely to change with the 0.9 rewrite (I saw
 that the current de-centralized rebalancing mechanics are the cause of
 other issues)?

 As a workaround, I tried waiting for nodes to re-appear in ZooKeeper (code
 at [2]), but this is still very slow (not to mention that this would be
 very intrusive to the tests I'm trying to write)

 Lastly, I should mention that I do need to create 3 consumers sequentially
 (as opposed to say, use a 3 threads consumer). The test in question simply
 happens to mimic the creation of consumers that may well be on 3 separate
 machines

 Best,

 [1] https://gist.github.com/ericbottard/91aa9ee114c6091e5b7b
 [2] https://gist.github.com/ericbottard/de87f1edc8eee2b9bee5

 --
 Eric Bottard




-- 
-- Guozhang


Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Xiaobin She
Todd,

Thank you for the information.

With 28,000+ files and 14 disks, that makes there are averagely about 4000
open files on two disk ( which is treated as one single disk) , am I right?

How do you manage to make the all the write operation to thest 4000 open
files be sequential to the disk?

As far as I know, write operation to different files on the same disk will
cause random write, which is not good for performance.

xiaobinshe




2014-10-23 1:00 GMT+08:00 Todd Palino tpal...@gmail.com:

 In fact there are many more than 4000 open files. Many of our brokers run
 with 28,000+ open files (regular file handles, not network connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.

 -Todd

 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote:

  Todd,
 
  Actually I'm wondering how kafka handle so much partition, with one
  partition there is at least one file on disk, and with 4000 partition,
  there will be at least 4000 files.
 
  When all these partitions have write request, how did Kafka make the
 write
  operation on the disk to be sequential (which is emphasized in the design
  document of Kafka) and make sure the disk access is effective?
 
  Thank you for your reply.
 
  xiaobinshe
 
 
 
  2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
   As far as the number of partitions a single broker can handle, we've
 set
   our cap at 4000 partitions (including replicas). Above that we've seen
  some
   performance and stability issues.
  
   -Todd
  
   On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
   wrote:
  
hello, everyone
   
I'm new to kafka, I'm wondering what's the max num of partition can
 one
siggle machine handle in Kafka?
   
Is there an sugeest num?
   
Thanks.
   
xiaobinshe