Re: kafka performance question

2014-05-14 Thread Jun Rao
How many brokers and partitions do you have? You may try increasing
batch.num.messages.

Thanks,

Jun


On Tue, May 13, 2014 at 5:56 PM, Zhujie (zhujie, Smartcare) <
first.zhu...@huawei.com> wrote:

> Dear all,
>
> We want to use kafka to collect and dispatch data file, but the
> performance is maybe lower than we want.
>
> In our cluster,there is a provider and a broker. We use a one thread read
> file from local disk of provider and send it to broker. The average
> throughput is only 3 MB/S~4MB/S.
> But if we just use java NIO API to send file ,the throughput can exceed
> 200MB/S.
> Why the kafka performance is so bad in our test, are we missing something??
>
>
>
> Our server:
> Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
> Mem:300G
> Disk:600G 15K RPM SAS*8
>
> Configuration of provider:
> props.put("serializer.class", "kafka.serializer.NullEncoder");
> props.put("metadata.broker.list", "169.10.35.57:9092");
> props.put("request.required.acks", "0");
> props.put("producer.type", "async");//异步
> props.put("queue.buffering.max.ms","500");
> props.put("queue.buffering.max.messages","10");
> props.put("batch.num.messages", "1200");
> props.put("queue.enqueue.timeout.ms", "-1");
> props.put("send.buffer.bytes", "10240");
>
> Configuration of broker:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> # Server Basics #
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=0
>
> # Socket Server Settings
> #
>
> # The port the socket server listens on
> port=9092
>
> # Hostname the broker will bind to. If not set, the server will bind to
> all interfaces
> #host.name=localhost
>
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=
>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=
>
> # The number of threads handling network requests
> #num.network.threads=2
> # The number of threads doing disk I/O
> #num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> #socket.send.buffer.bytes=1048576
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> #socket.receive.buffer.bytes=1048576
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> #socket.request.max.bytes=104857600
>
>
> # Log Basics #
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/data/kafka-logs
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> #num.partitions=2
>
> # Log Flush Policy
> #
>
> # Messages are immediately written to the filesystem but by default we
> only fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #1. Durability: Unflushed data may be lost if you are not using
> replication.
> #2. Latency: Very large flush intervals may lead to latency spikes
> when the flush does occur as there will be a lot of data to flush.
> #3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on
> a per-topic basis.
>
> # The number of messages to accept before forcing a flush of data to disk
> #log.flush.interval.messages=1
>
> # The maximum amount of time a message can sit in a log before we force a
> flush
> #log.flush.interval.ms=1000
>
> 

[jira] [Updated] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1445:
-

Attachment: KAFKA-1445_2014-05-14_16:24:25.patch

> New Producer should send all partitions that have non-empty batches when on 
> of them is ready
> 
>
> Key: KAFKA-1445
> URL: https://issues.apache.org/jira/browse/KAFKA-1445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1445.patch, KAFKA-1445.patch, 
> KAFKA-1445_2014-05-13_11:25:13.patch, KAFKA-1445_2014-05-14_16:24:25.patch
>
>
> One difference between the new producer and the old producer is that on the 
> new producer the linger time is per partition, instead of global. Therefore, 
> when the traffic is low, the sender will likely expire partitions one-by-one 
> and send lots of small request containing only a few partitions with a few 
> data, resulting largely increased request rate.
> One solution of it would be to let senders select all partitions that have 
> non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1431) ConsoleConsumer - Option to clean zk consumer path

2014-05-14 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13993141#comment-13993141
 ] 

Sriharsha Chintalapani commented on KAFKA-1431:
---

Created reviewboard  against branch trunk

> ConsoleConsumer - Option to clean zk consumer path
> --
>
> Key: KAFKA-1431
> URL: https://issues.apache.org/jira/browse/KAFKA-1431
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1
> Environment: All
>Reporter: Jeremy A Laycock
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1431.patch
>
>
> Raised in response to KAFKA-1426. Currently option "from-beginning" auto 
> deletes the zk consumer path. This is confusing and un-expected behaviour. 
> Suggest a separate option to clean the console consumer path.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1446) Consumer metrics for rebalance

2014-05-14 Thread Clark Haskins (JIRA)
Clark Haskins created KAFKA-1446:


 Summary: Consumer metrics for rebalance
 Key: KAFKA-1446
 URL: https://issues.apache.org/jira/browse/KAFKA-1446
 Project: Kafka
  Issue Type: Improvement
Reporter: Clark Haskins


The Kafka consumer should have metrics around the number of seconds spent in 
rebalance over the last minute as well as the number of rebalances started 
during the previous minute.

The other important thing about these metrics is that they should only be 
updated once per minute for example the rebalance time should not increase from 
second to second during a rebalance.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21123: Patch for KAFKA-1437

2014-05-14 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21123/#review42514
---

Ship it!


Ship It!

- Neha Narkhede


On May 6, 2014, 5:56 p.m., Joel Koshy wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21123/
> ---
> 
> (Updated May 6, 2014, 5:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1437
> https://issues.apache.org/jira/browse/KAFKA-1437
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1437; Consumer metadata response should include (empty) coordinator 
> information if the coordinator is unavailable.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
> 6807f9806fb155f6cdb1aaf63c14ac8ec180d779 
>   core/src/main/scala/kafka/client/ClientUtils.scala 
> fc9e08423a4127e1d64be1e62def567ea9eb80a3 
>   core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 
> dfa9c42bcab39b56775fddedea1137a518349386 
>   core/src/test/scala/other/kafka/TestOffsetManager.scala 
> c468419297d297c2f320ff6d6dbbc5cd42380aa2 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> d39a9a4bf89af493122b663f1378b9d01c9440ec 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
> 19a86355844fd11550c8771165a86b6731a20d26 
> 
> Diff: https://reviews.apache.org/r/21123/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>



Re: Review Request 21174: Fix KAFKA-1396

2014-05-14 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21174/#review42534
---



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala


Should we check that after shutting down the producer scheduler?



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala


waitUntilMetadata doesn't return the leader now. You will have to add it 
back.



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala


Could we use TestUtils.createNewProducer()?


- Jun Rao


On May 7, 2014, 9:56 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21174/
> ---
> 
> (Updated May 7, 2014, 9:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1396
> https://issues.apache.org/jira/browse/KAFKA-1396
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 1. Reduce bouncing test iterations from 5 to 2.
> 2. Double check metadata is propagated after the bouncing iterations and 
> before fetching data.
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> a993e8c7531abca6b82d1718ac57ba6a295c1fc7 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> af11a4983599a20a33b6f1fc05738a59abfede63 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 034f36130a1d0669dda5ed53e9cee328806c811e 
> 
> Diff: https://reviews.apache.org/r/21174/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 19731: Patch for KAFKA-1328

2014-05-14 Thread Neha Narkhede


> On May 7, 2014, 4:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java,
> >  lines 37-38
> > 
> >
> > partitionId can probably be just partition to be consistent with what's 
> > in ProducerRecord.

It's probably going to require a change on the producer. See my comment on this 
rb previously -
"The returned object from partition() is TopicPartition on purpose. I realized 
that returning partition id from this API is useless since all other APIs in 
the consumer accept TopicPartition. The constructor parameter can be renamed to 
partitionId"


> On May 7, 2014, 4:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, 
> > lines 98-104
> > 
> >
> > If we can't think of a usage of this api, perhaps we should just remove 
> > it. The typically usage is that we want to seek to a previously committed 
> > offset. However, knowing the current fetch offset is of little use.

Please refer to the mailing list discussion on the requirement for this API. 
The subject was something like "New consumer API discussion".


> On May 7, 2014, 4:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java,
> >  lines 108-109
> > 
> >
> > The issue with not exposing a nextOffset() api is that users have to 
> > figure out the next offset themselves, which is not natural. The common 
> > usage is the app finishes consuming a record and want to commit the next 
> > offset (not the current offset) after the consumed record. Having a 
> > nextOffset() will allow us to explain this to the user better in the api.

Let's discuss this more explicitly on the mailing list. I'm not opposed to 
exposing the API if most people feel the need for it. Make sure you give this 
feedback on the API discussion thread as well. I can make this change after the 
initial patch is in.


> On May 7, 2014, 4:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 53-63
> > 
> >
> > The returned offset should be the next offset. See the comment on 
> > exposing nextOffset().

Not really. processedOffsets here stores offsets of records for which the 
consumer has finished processing. So record.offset() seems correct right?


> On May 7, 2014, 4:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java,
> >  lines 74-86
> > 
> >
> > Perhaps we just need to combine the two into one api 
> > topicAndPartition().

We could definitely add a topicAndPartition() API.


> On May 7, 2014, 4:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, 
> > lines 87-88
> > 
> >
> > The Future thing doesn't work well in this case. This is because the 
> > caller thread is also the one that does the polling. If the caller calls 
> > future.get, it will block forever since there won't be any polling so that 
> > we can get the response. So, we will likely have to make a separate 
> > blocking api.

Hmm.. even if we expose it as a separate API, it seems the problem you 
mentioned will not go away. Probably adding a callback is a better approach.


- Neha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/19731/#review42405
---


On May 5, 2014, 6:35 p.m., Neha Narkhede wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/19731/
> ---
> 
> (Updated May 5, 2014, 6:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1328
> https://issues.apache.org/jira/browse/KAFKA-1328
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Review comments from Jun and Guozhang
> 
> 
> Checked in ConsumerRecordMetadata
> 
> 
> Fixed the javadoc usage examples in KafkaConsumer to match the API changes
> 
> 
> Changed the signature of poll to return Map to 
> organize the ConsumerRecords around topic and then optionally around 
> partition. This will serve the group management as well as custom partition 
> subscription use cases
> 
> 
> 1. Changed the signature of poll() to return Map List> 2. Changed ConsumerRecord to throw an exception if an 
> error is detected for the partition. For example, if a single large m

kafka performance question

2014-05-14 Thread Zhujie (zhujie, Smartcare)
Dear all,

We want to use kafka to collect and dispatch data file, but the performance is 
maybe lower than we want.

In our cluster,there is a provider and a broker. We use a one thread read file 
from local disk of provider and send it to broker. The average throughput is 
only 3 MB/S~4MB/S.
But if we just use java NIO API to send file ,the throughput can exceed 200MB/S.
Why the kafka performance is so bad in our test, are we missing something??



Our server:
Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
Mem:300G
Disk:600G 15K RPM SAS*8

Configuration of provider:
props.put("serializer.class", "kafka.serializer.NullEncoder");
props.put("metadata.broker.list", "169.10.35.57:9092");
props.put("request.required.acks", "0");
props.put("producer.type", "async");//异步
props.put("queue.buffering.max.ms","500");
props.put("queue.buffering.max.messages","10");
props.put("batch.num.messages", "1200");
props.put("queue.enqueue.timeout.ms", "-1");
props.put("send.buffer.bytes", "10240");

Configuration of broker:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

# Server Basics #

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Socket Server Settings 
#

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it 
uses the
# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=

# The number of threads handling network requests
#num.network.threads=2
# The number of threads doing disk I/O
#num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
#socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
#socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection 
against OOM)
#socket.request.max.bytes=104857600


# Log Basics #

# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#num.partitions=2

# Log Flush Policy #

# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data 
to disk.
# There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the 
flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data 
after a period of time or
# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=1

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

# Log Retention Policy #

# The following configurations control the disposal of log segments. The policy 
can
# be set to delete segments after a period of time, or after a given size has 
accumulated.
# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
# from the end of the log.

# The minimum age of a log file to b

Re: Review Request 21239: Patch for KAFKA-1431

2014-05-14 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21239/
---

(Updated May 14, 2014, 6:33 p.m.)


Review request for kafka.


Bugs: KAFKA-1431
https://issues.apache.org/jira/browse/KAFKA-1431


Repository: kafka


Description (updated)
---

KAFKA-1431. ConsoleConsumer - Option to clean zk consumer path changed options 
--delete-consumer-offsets. It checks if existing offsets for givent topic & 
groupId exists and throws error if user didn't specifiy 
--delete-consumer-offsets


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsoleConsumer.scala 
0f62819be0562f62c0f778bd20ead053f01a6f2f 

Diff: https://reviews.apache.org/r/21239/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



Re: Max Message Size

2014-05-14 Thread Darion Yaphet
max.message.bytes

This is largest message size Kafka will allow to be appended to this topic.
Note that if you increase this size you must also increase your consumer's
fetch size so they can fetch messages this large.

When the message length is larger than max,message.bytes  maybe throw an
exception .. I think ~


2014-05-14 2:23 GMT+08:00 Bhavesh Mistry :

> Hi Kafka Team,
>
> Is there any message size limitation from producer side ?  If there, is
> what happens to message, does it get truncated or message is lost ?
>
> Thanks,
>
> Bhavesh
>



-- 


long is the way and hard  that out of Hell leads up to light


Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-14 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/#review42985
---



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java


I want to second Jun's comment. I think this method is far too complex. We 
need to find a way to simplify this.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java


Metadata updates cannot cannot cannot bleed into this class. This class 
isn't about updating metadata.



clients/src/main/java/org/apache/kafka/common/Cluster.java


There are two methods with the same name that do two different things. Can 
we make them something like

partitionsForTopic(topic)
and
partitionsForNode(node)
?


- Jay Kreps


On May 13, 2014, 6:25 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21398/
> ---
> 
> (Updated May 13, 2014, 6:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1445
> https://issues.apache.org/jira/browse/KAFKA-1445
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
> of ready nodes instead of partitions;\n 2. Ready would also check if there is 
> any ready partitions with unknown leader, if yes indicate the 
> processReadyNode to force metadata refresh;\n 3. Drain would take a list of 
> nodes and drain the batches per node until the max request size is reached;\n 
> 4. Collocate would not be just tranform batches per node into a producer 
> request;\n 5. Corresponding unit test changes; \n 6. One minor compilation 
> warning fix
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> 426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  f37ab770b1794830154f9908a0156e7e99b4a458 
>   
> clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
> 1df226606fad29da47d81d0b8ff36209c3536c06 
> 
> Diff: https://reviews.apache.org/r/21398/diff/
> 
> 
> Testing
> ---
> 
> unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Commented] (KAFKA-1182) Topic not created if number of live brokers less than # replicas

2014-05-14 Thread Simon Cooper (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13997445#comment-13997445
 ] 

Simon Cooper commented on KAFKA-1182:
-

The current behaviour is causing problems for us - we've got an automated 
install script that creates several topics, and when creating lots of topics in 
sequence kafka has a tendancy to bounce replicas, causing spurious failures in 
the install script. Ideally, topic creation of an under-replicated topic (both 
ad-hoc and using the kafka-topics.sh script) could be forced, so an automated 
install could complete, allowing under-replicated topics to be dealt with 
afterwards

> Topic not created if number of live brokers less than # replicas
> 
>
> Key: KAFKA-1182
> URL: https://issues.apache.org/jira/browse/KAFKA-1182
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.0
> Environment: Centos 6.3
>Reporter: Hanish Bansal
>Assignee: Jun Rao
>
> We are having kafka cluster of 2 nodes. (Using Kafka 0.8.0 version)
> Replication Factor: 2
> Number of partitions: 2
> Actual Behaviour:
> Out of two nodes, if any one node goes down then topic is not created in 
> kafka. 
> Steps to Reproduce:
> 1. Create a 2 node kafka cluster with replication factor 2
> 2. Start the Kafka cluster
> 3. Kill any one node
> 4.  Start the producer to write on a new topic
> 5. Observe the exception stated below:
> 2013-12-12 19:37:19 0 [WARN ] ClientUtils$ - Fetching topic metadata with
> correlation id 3 for topics [Set(test-topic)] from broker
> [id:0,host:122.98.12.11,port:9092] failed
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> Expected Behaviour: 
> In case of live brokers less than # replicas:
> There should be topic created so at least live brokers can receive the data.
> They can replicate data to other broker once any down broker comes up.
> Because now in case of live brokers less than # replicas, there is complete
> loss of data.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21239: Patch for KAFKA-1431

2014-05-14 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21239/
---

(Updated May 14, 2014, 7:08 p.m.)


Review request for kafka.


Bugs: KAFKA-1431
https://issues.apache.org/jira/browse/KAFKA-1431


Repository: kafka


Description (updated)
---

KAFKA-1431. ConsoleConsumer - Option to clean zk consumer path changed options 
--delete-consumer-offsets. It checks if existing offsets for givent topic & 
groupId exists and throws error if user didn't specifiy 
--delete-consumer-offsets


KAFKA-1431.ConsoleConsumer - Option to clean zk consumer path.  just checking 
consumers/groupId/offsets is good enough, removed for checking topicId under 
offsets.


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ConsoleConsumer.scala 
0f62819be0562f62c0f778bd20ead053f01a6f2f 

Diff: https://reviews.apache.org/r/21239/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1449) Extend wire protocol to allow CRC32C

2014-05-14 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13997681#comment-13997681
 ] 

Neha Narkhede commented on KAFKA-1449:
--

Interesting. Would you like to write up a proposal for the suggested changes? 
We can take it from there.

> Extend wire protocol to allow CRC32C
> 
>
> Key: KAFKA-1449
> URL: https://issues.apache.org/jira/browse/KAFKA-1449
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Albert Strasheim
>Assignee: Neha Narkhede
> Fix For: 0.9.0
>
>
> Howdy
> We are currently building out a number of Kafka consumers in Go, based on a 
> patched version of the Sarama library that Shopify released a while back.
> We have a reasonably fast serialization protocol (Cap'n Proto), a 10G network 
> and lots of cores. We have various consumers computing all kinds of 
> aggregates on a reasonably high volume access log stream (1.1e6 messages/sec 
> peak, about 500-600 bytes per message uncompressed).
> When profiling our consumer, our single hottest function (until we disabled 
> it), was the CRC32 checksum validation, since the deserialization and 
> aggregation in these consumers is pretty cheap.
> We believe things could be improved by extending the wire protocol to support 
> CRC-32C (Castagnoli), since SSE 4.2 has an instruction to accelerate its 
> calculation.
> https://en.wikipedia.org/wiki/SSE4#SSE4.2
> It might be hard to use from Java, but consumers written in most other 
> languages will benefit a lot.
> To give you an idea, here are some benchmarks for the Go CRC32 functions 
> running on a Intel(R) Core(TM) i7-3540M CPU @ 3.00GHz core:
> BenchmarkCrc32KB   90196 ns/op 363.30 MB/s
> BenchmarkCrcCastagnoli32KB 3404 ns/op 9624.42 MB/s
> I believe BenchmarkCrc32 written in C would do about 600-700 MB/sec, and the 
> CRC32-C speed should be close to what one achieves in Go.
> (Met Todd and Clark at the meetup last night. Thanks for the great 
> presentation!)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1431) ConsoleConsumer - Option to clean zk consumer path

2014-05-14 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1431:
--

Attachment: KAFKA-1431_2014-05-14_11:34:00.patch

> ConsoleConsumer - Option to clean zk consumer path
> --
>
> Key: KAFKA-1431
> URL: https://issues.apache.org/jira/browse/KAFKA-1431
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1
> Environment: All
>Reporter: Jeremy A Laycock
>Priority: Minor
>  Labels: newbie
> Attachments: KAFKA-1431.patch, KAFKA-1431_2014-05-14_11:34:00.patch
>
>
> Raised in response to KAFKA-1426. Currently option "from-beginning" auto 
> deletes the zk consumer path. This is confusing and un-expected behaviour. 
> Suggest a separate option to clean the console consumer path.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1450) check invalid leader in a more robust way

2014-05-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13997267#comment-13997267
 ] 

Jun Rao commented on KAFKA-1450:


Created reviewboard https://reviews.apache.org/r/21428/
 against branch origin/trunk

> check invalid leader in a more robust way
> -
>
> Key: KAFKA-1450
> URL: https://issues.apache.org/jira/browse/KAFKA-1450
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Jun Rao
> Attachments: KAFKA-1450.patch
>
>
> In MetadataResponse, we only treat -1 as an invalid leader.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1451) Broker stuck due to leader election race

2014-05-14 Thread Maciek Makowski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciek Makowski updated KAFKA-1451:
---

Description: 
h3. Symptoms

The broker does not become available due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a single kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* zookeeper: delete {{/controller}} since the session that created it timed out
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.

  was:
h3. Symptoms

The broker does not become available due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a sinle kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* zookeeper: delete {{/controller}} since the session that created it timed out
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.


> Broker stuck due to leader election race 
> -
>
> Key: KAFKA-1451
> URL: https://issues.apache.org/jira/browse/KAFKA-1451
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Maciek Makowski
>Priority: Minor
>
> h3. Symptoms
> The broker does not become available due to being stuck in an infinite loop 
> while electing leader. This can be recognised by the following line being 
> repeatedly written to server.log:
> {code}
> [2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
> [{"version":1,"brokerid":1,"timestamp":"1400060079108"}

Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-14 Thread Timothy Chen

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/#review42982
---



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java


Got it!


- Timothy Chen


On May 13, 2014, 6:25 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21398/
> ---
> 
> (Updated May 13, 2014, 6:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1445
> https://issues.apache.org/jira/browse/KAFKA-1445
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
> of ready nodes instead of partitions;\n 2. Ready would also check if there is 
> any ready partitions with unknown leader, if yes indicate the 
> processReadyNode to force metadata refresh;\n 3. Drain would take a list of 
> nodes and drain the batches per node until the max request size is reached;\n 
> 4. Collocate would not be just tranform batches per node into a producer 
> request;\n 5. Corresponding unit test changes; \n 6. One minor compilation 
> warning fix
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> 426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  f37ab770b1794830154f9908a0156e7e99b4a458 
>   
> clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
> 1df226606fad29da47d81d0b8ff36209c3536c06 
> 
> Diff: https://reviews.apache.org/r/21398/diff/
> 
> 
> Testing
> ---
> 
> unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



Re: Review Request 21398: Fix KAFKA-1445 v2

2014-05-14 Thread Guozhang Wang


> On May 13, 2014, 6:37 p.m., Timothy Chen wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java,
> >  line 323
> > 
> >
> > Should we only call forceUpdate once if we get multiple unknown nodes?

The benefit of forcing metadata update whenever there is one ready partition 
with unknown leader is to minimize latency for these cases. Since partitions 
with unknown leaders (e.g. newly created topic, extended partitions, etc) would 
be a rare case, I think this would not add too much load on metadata request 
and a good tradeoff for latency.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21398/#review42869
---


On May 13, 2014, 6:25 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21398/
> ---
> 
> (Updated May 13, 2014, 6:25 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1445
> https://issues.apache.org/jira/browse/KAFKA-1445
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> 0. Add the partitionsForNode index in Cluster;\n 1. Ready would return a list 
> of ready nodes instead of partitions;\n 2. Ready would also check if there is 
> any ready partitions with unknown leader, if yes indicate the 
> processReadyNode to force metadata refresh;\n 3. Drain would take a list of 
> nodes and drain the batches per node until the max request size is reached;\n 
> 4. Collocate would not be just tranform batches per node into a producer 
> request;\n 5. Corresponding unit test changes; \n 6. One minor compilation 
> warning fix
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  fbb732a57522109ac0e0eaf5c87b50cbd3a7f767 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  2d7e52d430fa267ee3689a06f8a621ce5dfd1e33 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> f0152fabbdd44e9f1a24291e84c17edf8379f4fc 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> 426bd1eec708979149cbd6fa3959e6f9e73c7e0e 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  f37ab770b1794830154f9908a0156e7e99b4a458 
>   
> clients/src/test/java/org/apache/kafka/common/utils/AbstractIteratorTest.java 
> 1df226606fad29da47d81d0b8ff36209c3536c06 
> 
> Diff: https://reviews.apache.org/r/21398/diff/
> 
> 
> Testing
> ---
> 
> unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Commented] (KAFKA-1393) I wrote this conflicted ephemeral node at /brokers/ids/199 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry

2014-05-14 Thread Yongkun Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996052#comment-13996052
 ] 

Yongkun Wang commented on KAFKA-1393:
-

Check whether have set different broker.id for each broker.

> I wrote this conflicted ephemeral node  at /brokers/ids/199 a while back in a 
> different session, hence I will backoff for this node to be deleted by 
> Zookeeper and retry
> 
>
> Key: KAFKA-1393
> URL: https://issues.apache.org/jira/browse/KAFKA-1393
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0
>Reporter: zuolin
>Priority: Critical
>
> We're seeing the following log statements (over and over):
> [2014-04-14 14:25:43,304] INFO re-registering broker info in ZK for broker 
> 199 (kafka.server.KafkaZooKeeper)
> [2014-04-14 14:25:47,856] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:53,691] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:53,692] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:54,540] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:54,543] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:26:15,997] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:16,063] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:16,116] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,721] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,722] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,738] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,767] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:35,794] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:36,009] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:37,712] INFO Registered broker 199 at path /brokers/ids/199 
> with address 10.4.56.199:9092. (kafka.utils.ZkUtils$)
> [2014-04-14 14:26:37,789] INFO done re-registering broker 
> (kafka.server.KafkaZooKeeper)
> [2014-04-14 14:26:37,947] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaZooKeeper)
> [2014-04-14 14:26:38,179] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:39,158] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:26:40,398] INFO 199 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> [2014-04-14 14:26:43,332] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,332] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,358] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,473] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,701] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,994] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,995] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:44,076] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:44,377] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:50,002] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:50,874] INFO [Replica Manager on Broker 199]: Handling 
> LeaderAndIsr request 
> Name:LeaderAndIsrRequest;Version:0;Controller:199;ControllerEpoch:4;CorrelationId:4;ClientId:id_199-host_10.4.56.199-port_9092;PartitionState:(cacheMonitor-server,0)
>  -> 
> (LeaderAndIsrInfo:(Leader:199,ISR:199,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:199),(socket-server-4,0)
>  -> 
> (LeaderAndIsrInfo:(Leader:199,ISR:199,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:199),(push-server-3,0)
>  -> 
> (LeaderAndIsrInfo:(Leader:199,ISR:199,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:199),(socket-server-2,0)
>  -> 
> (LeaderAndIsrInfo:(Lea

[jira] [Updated] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1445:
-

Attachment: KAFKA-1445_2014-05-13_11:25:13.patch

> New Producer should send all partitions that have non-empty batches when on 
> of them is ready
> 
>
> Key: KAFKA-1445
> URL: https://issues.apache.org/jira/browse/KAFKA-1445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1445.patch, KAFKA-1445.patch, 
> KAFKA-1445_2014-05-13_11:25:13.patch
>
>
> One difference between the new producer and the old producer is that on the 
> new producer the linger time is per partition, instead of global. Therefore, 
> when the traffic is low, the sender will likely expire partitions one-by-one 
> and send lots of small request containing only a few partitions with a few 
> data, resulting largely increased request rate.
> One solution of it would be to let senders select all partitions that have 
> non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21428: Patch for KAFKA-1450

2014-05-14 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21428/#review42967
---



core/src/main/scala/kafka/api/FetchRequest.scala


Should isFromFollower be false if either isFromOrdinaryConsumer or 
isFromLowLevelConsumer is true?


- Guozhang Wang


On May 14, 2014, 4:40 a.m., Jun Rao wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21428/
> ---
> 
> (Updated May 14, 2014, 4:40 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1450
> https://issues.apache.org/jira/browse/KAFKA-1450
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Use Node.isIdValid() instead of testing against -1.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> 0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
>   
> clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
> 2652c32f123b3bc4b0456d4bc9fbba52c051724c 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> a8b73acd1a813284744359e8434cb52d22063c99 
>   core/src/main/scala/kafka/api/RequestOrResponse.scala 
> 57f87a48c5e87220e7f377b23d2bbfa0d16350dc 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 0b668f230c8556fdf08654ce522a11847d0bf39b 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 4da0f2c245f75ff0dcab4ecf0af085ab9f8da1bb 
> 
> Diff: https://reviews.apache.org/r/21428/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jun Rao
> 
>



[jira] [Created] (KAFKA-1450) check invalid leader in a more robust way

2014-05-14 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1450:
--

 Summary: check invalid leader in a more robust way
 Key: KAFKA-1450
 URL: https://issues.apache.org/jira/browse/KAFKA-1450
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao


In MetadataResponse, we only treat -1 as an invalid leader.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: kafka performance question

2014-05-14 Thread Guozhang Wang
Hi Zhujie,

When you use a single thread to read data from disk and send to Kafka
server, which client/API did you use?

Guozhang



On Tue, May 13, 2014 at 6:32 PM, Zhujie (zhujie, Smartcare) <
first.zhu...@huawei.com> wrote:

> our version is kafka_2.10-0.8.1
>
> 发件人: Zhujie (zhujie, Smartcare)
> 发送时间: 2014年5月14日 8:56
> 收件人: 'us...@kafka.apache.org'; 'dev@kafka.apache.org'
> 主题: kafka performance question
>
> Dear all,
>
> We want to use kafka to collect and dispatch data file, but the
> performance is maybe lower than we want.
>
> In our cluster,there is a provider and a broker. We use a one thread read
> file from local disk of provider and send it to broker. The average
> throughput is only 3 MB/S~4MB/S.
> But if we just use java NIO API to send file ,the throughput can exceed
> 200MB/S.
> Why the kafka performance is so bad in our test, are we missing something??
>
>
>
> Our server:
> Cpu: Intel(R) Xeon(R) CPU E5-4650 0 @ 2.70GHz*4
> Mem:300G
> Disk:600G 15K RPM SAS*8
>
> Configuration of provider:
> props.put("serializer.class", "kafka.serializer.NullEncoder");
> props.put("metadata.broker.list", "169.10.35.57:9092");
> props.put("request.required.acks", "0");
> props.put("producer.type", "async");//异步
> props.put("queue.buffering.max.ms","500");
> props.put("queue.buffering.max.messages","10");
> props.put("batch.num.messages", "1200");
> props.put("queue.enqueue.timeout.ms", "-1");
> props.put("send.buffer.bytes", "10240");
>
> Configuration of broker:
>
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements.  See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership.
> # The ASF licenses this file to You under the Apache License, Version 2.0
> # (the "License"); you may not use this file except in compliance with
> # the License.  You may obtain a copy of the License at
> #
> #http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> # see kafka.server.KafkaConfig for additional details and defaults
>
> # Server Basics #
>
> # The id of the broker. This must be set to a unique integer for each
> broker.
> broker.id=0
>
> # Socket Server Settings
> #
>
> # The port the socket server listens on
> port=9092
>
> # Hostname the broker will bind to. If not set, the server will bind to
> all interfaces
> #host.name=localhost
>
> # Hostname the broker will advertise to producers and consumers. If not
> set, it uses the
> # value for "host.name" if configured.  Otherwise, it will use the value
> returned from
> # java.net.InetAddress.getCanonicalHostName().
> #advertised.host.name=
>
> # The port to publish to ZooKeeper for clients to use. If this is not set,
> # it will publish the same port that the broker binds to.
> #advertised.port=
>
> # The number of threads handling network requests
> #num.network.threads=2
>
> # The number of threads doing disk I/O
> #num.io.threads=8
>
> # The send buffer (SO_SNDBUF) used by the socket server
> #socket.send.buffer.bytes=1048576
>
> # The receive buffer (SO_RCVBUF) used by the socket server
> #socket.receive.buffer.bytes=1048576
>
> # The maximum size of a request that the socket server will accept
> (protection against OOM)
> #socket.request.max.bytes=104857600
>
>
> # Log Basics #
>
> # A comma seperated list of directories under which to store log files
> log.dirs=/data/kafka-logs
>
> # The default number of log partitions per topic. More partitions allow
> greater
> # parallelism for consumption, but this will also result in more files
> across
> # the brokers.
> #num.partitions=2
>
> # Log Flush Policy
> #
>
> # Messages are immediately written to the filesystem but by default we
> only fsync() to sync
> # the OS cache lazily. The following configurations control the flush of
> data to disk.
> # There are a few important trade-offs here:
> #1. Durability: Unflushed data may be lost if you are not using
> replication.
> #2. Latency: Very large flush intervals may lead to latency spikes
> when the flush does occur as there will be a lot of data to flush.
> #3. Throughput: The flush is generally the most expensive operation,
> and a small flush interval may lead to exceessive seeks.
> # The settings below allow one to configure the flush policy to flush data
> after a period of time or
> # every N messages (or both). This can be done globally and overridden on
> a per-topic basis.
>
> # The number of mes

[jira] [Updated] (KAFKA-1451) Broker stuck due to leader election race

2014-05-14 Thread Maciek Makowski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciek Makowski updated KAFKA-1451:
---

Description: 
h3. Symptoms

The broker does not become available due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a sinle kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* zookeeper: delete {{/controller}} since the session that created it timed out
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.

  was:
h3. Symptoms

The broker does not become available, due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a sinle kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* zookeeper: delete {{/controller}} since the session that created it timed out
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.


> Broker stuck due to leader election race 
> -
>
> Key: KAFKA-1451
> URL: https://issues.apache.org/jira/browse/KAFKA-1451
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Maciek Makowski
>Priority: Minor
>
> h3. Symptoms
> The broker does not become available due to being stuck in an infinite loop 
> while electing leader. This can be recognised by the following line being 
> repeatedly written to server.log:
> {code}
> [2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
> [{"version":1,"brokerid":1,"timestamp":"1400060079108"}

Re: How can I step through the Kafka code using a debugger

2014-05-14 Thread Sheng Wang
Tim,

Thanks for the reply! I have followed the IDE setup instructions and tried
using IntelliJ to build Kafka.  I used java 1.7.0_55-b13 on Mac OS X
Mavericks. The "gradlew jar" commands etc. worked fine.  However, I cannot
use IntelliJ to build the project. I am using IntelliJ 13.1 to build the
project. Could anyone tell how I can solve this problem?

Thanks!

Information:Compilation completed with 38 errors and 7 warnings in 22 sec
Error:scalac: Error:
org.jetbrains.jps.incremental.scala.remote.ServerException
Error compiling sbt component 'compiler-interface-2.8.0.final-51.0'
at
sbt.compiler.AnalyzingCompiler$$anonfun$compileSources$1$$anonfun$apply$2.apply(AnalyzingCompiler.scala:145)
at
sbt.compiler.AnalyzingCompiler$$anonfun$compileSources$1$$anonfun$apply$2.apply(AnalyzingCompiler.scala:142)
at sbt.IO$.withTemporaryDirectory(IO.scala:285)
at
sbt.compiler.AnalyzingCompiler$$anonfun$compileSources$1.apply(AnalyzingCompiler.scala:142)
at
sbt.compiler.AnalyzingCompiler$$anonfun$compileSources$1.apply(AnalyzingCompiler.scala:139)
at sbt.IO$.withTemporaryDirectory(IO.scala:285)
at
sbt.compiler.AnalyzingCompiler$.compileSources(AnalyzingCompiler.scala:139)
at sbt.compiler.IC$.compileInterfaceJar(IncrementalCompiler.scala:33)
at
org.jetbrains.jps.incremental.scala.local.CompilerFactoryImpl$.org$jetbrains$jps$incremental$scala$local$CompilerFactoryImpl$$getOrCompileInterfaceJar(CompilerFactoryImpl.scala:86)
at
org.jetbrains.jps.incremental.scala.local.CompilerFactoryImpl$$anonfun$getScalac$1.apply(CompilerFactoryImpl.scala:43)
at
org.jetbrains.jps.incremental.scala.local.CompilerFactoryImpl$$anonfun$getScalac$1.apply(CompilerFactoryImpl.scala:42)
at scala.Option.map(Option.scala:145)
at
org.jetbrains.jps.incremental.scala.local.CompilerFactoryImpl.getScalac(CompilerFactoryImpl.scala:42)
at
org.jetbrains.jps.incremental.scala.local.CompilerFactoryImpl.createCompiler(CompilerFactoryImpl.scala:21)
at
org.jetbrains.jps.incremental.scala.local.CachingFactory$$anonfun$createCompiler$1.apply(CachingFactory.scala:23)
at
org.jetbrains.jps.incremental.scala.local.CachingFactory$$anonfun$createCompiler$1.apply(CachingFactory.scala:23)
at
org.jetbrains.jps.incremental.scala.local.Cache$$anonfun$getOrUpdate$2.apply(Cache.scala:20)
at scala.Option.getOrElse(Option.scala:120)
at
org.jetbrains.jps.incremental.scala.local.Cache.getOrUpdate(Cache.scala:19)
at
org.jetbrains.jps.incremental.scala.local.CachingFactory.createCompiler(CachingFactory.scala:22)
at
org.jetbrains.jps.incremental.scala.local.LocalServer.compile(LocalServer.scala:21)
at org.jetbrains.jps.incremental.scala.remote.Main$.make(Main.scala:64)
at org.jetbrains.jps.incremental.scala.remote.Main$.nailMain(Main.scala:22)
at org.jetbrains.jps.incremental.scala.remote.Main.nailMain(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.martiansoftware.nailgun.NGSession.run(NGSession.java:319)
Warning:scalac:
/var/folders/k2/_ksrnkxx6p502x6sb3v4jsvhgp/T/sbt_8d7d63cd/CompilerInterface.scala:161:
error: object creation impossible, since method registerTopLevelSym in
trait GlobalCompat of type (sym: this.Symbol)Unit is not defined
Warning:scalac:  new Compiler() with RangePositions // unnecessary in 2.11
Warning:scalac: ^
Warning:scalac:
/var/folders/k2/_ksrnkxx6p502x6sb3v4jsvhgp/T/sbt_8d7d63cd/CompilerInterface.scala:165:
error: class Compiler needs to be abstract, since method
registerTopLevelSym in trait GlobalCompat of type (sym:
Compiler.this.Symbol)Unit is not defined
Warning:scalac:  class Compiler extends CallbackGlobal(command.settings,
dreporter, output)
Warning:scalac:   ^
Warning:scalac: two errors found
/Users/aaa/git/kafka2/core/src/main/scala/kafka/tools/KafkaMigrationTool.java
Error:(21, 30) java: package kafka.javaapi.producer does not exist
Error:(22, 22) java: package kafka.producer does not exist
Error:(23, 22) java: package kafka.producer does not exist
Error:(24, 19) java: cannot find symbol
  symbol:   class Utils
  location: package kafka.utils
Error:(303, 39) java: cannot find symbol
  symbol:   class KeyedMessage
  location: class kafka.tools.KafkaMigrationTool.MigrationThread
Error:(310, 57) java: cannot find symbol
  symbol:   class KeyedMessage
  location: class kafka.tools.KafkaMigrationTool.MigrationThread
Error:(366, 39) java: cannot find symbol
  symbol:   class KeyedMessage
  location: class kafka.tools.KafkaMigrationTool.ProducerThread
Error:(367, 19) java: cannot find symbol
  symbol:   class Producer
  location: class kafka.tools.KafkaMigrationTool.ProducerThread
Error:(372, 13) java: cannot find symbol
  symbol:   class KeyedMessage
  location: class kafka.tools.KafkaMigrationTool.ProducerThread
Error:(374, 47) java: cannot find symbol
  

[jira] [Updated] (KAFKA-1445) New Producer should send all partitions that have non-empty batches when on of them is ready

2014-05-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1445:
-

Attachment: KAFKA-1445.patch

> New Producer should send all partitions that have non-empty batches when on 
> of them is ready
> 
>
> Key: KAFKA-1445
> URL: https://issues.apache.org/jira/browse/KAFKA-1445
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
> Fix For: 0.9.0
>
> Attachments: KAFKA-1445.patch, KAFKA-1445.patch
>
>
> One difference between the new producer and the old producer is that on the 
> new producer the linger time is per partition, instead of global. Therefore, 
> when the traffic is low, the sender will likely expire partitions one-by-one 
> and send lots of small request containing only a few partitions with a few 
> data, resulting largely increased request rate.
> One solution of it would be to let senders select all partitions that have 
> non-empty batches when on of them is ready.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1447) Controlled shutdown deadlock when trying to send state updates

2014-05-14 Thread Sam Meder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13997524#comment-13997524
 ] 

Sam Meder commented on KAFKA-1447:
--

We'll be rolling out 0.8.1.1 soon, but since we can't reproduce this easily it 
is going to be hard to validate. I did look through any of the Jira issues 
related to controlled shutdown to see if this was already addressed and didn't 
see any that seemed to match this situation. I'll report back once we have run 
with 0.8.1.1 for a while.

> Controlled shutdown deadlock when trying to send state updates
> --
>
> Key: KAFKA-1447
> URL: https://issues.apache.org/jira/browse/KAFKA-1447
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Neha Narkhede
>
> We're seeing controlled shutdown indefinitely stuck on trying to send out 
> state change messages to the other brokers:
> [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
> completed (kafka.network.SocketServer)
> [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> and stuck on:
> "kafka-request-handler-12" daemon prio=10 tid=0x7f1f04a66800 nid=0x6e79 
> waiting on condition [0x7f1ad5767000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x00078e91dc20> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> locked <0x00078e91dc38> (a java.lang.Object)
> at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
> at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
> at 
> kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> locked <0x00078b495af0> (a java.lang.Object)
> at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1451) Broker stuck due to leader election race

2014-05-14 Thread Maciek Makowski (JIRA)
Maciek Makowski created KAFKA-1451:
--

 Summary: Broker stuck due to leader election race 
 Key: KAFKA-1451
 URL: https://issues.apache.org/jira/browse/KAFKA-1451
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Maciek Makowski
Priority: Minor


h3. Symptoms

The broker does not become available, due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a sinle kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-05-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996687#comment-13996687
 ] 

Jun Rao commented on KAFKA-1316:


Some of the failure handling seems to be common btw the producer and the 
consumer. Could the logic of handling TopicMetdataRequest be factored into this 
RequestSelector class too?

> Refactor Sender
> ---
>
> Key: KAFKA-1316
> URL: https://issues.apache.org/jira/browse/KAFKA-1316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Jay Kreps
>Assignee: Jay Kreps
>
> Currently most of the logic of the producer I/O thread is in Sender.java.
> However we will need to do a fair number of similar things in the new 
> consumer. Specifically:
>  - Track in-flight requests
>  - Fetch metadata
>  - Manage connection lifecycle
> It may be possible to refactor some of this into a helper class that can be 
> shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1451) Broker stuck due to leader election race

2014-05-14 Thread Maciek Makowski (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciek Makowski updated KAFKA-1451:
---

Description: 
h3. Symptoms

The broker does not become available, due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a sinle kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* zookeeper: delete {{/controller}} since the session that created it timed out
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.

  was:
h3. Symptoms

The broker does not become available, due to being stuck in an infinite loop 
while electing leader. This can be recognised by the following line being 
repeatedly written to server.log:

{code}
[2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}

h3. Steps to Reproduce

In a sinle kafka 0.8.1.1 node, single zookeeper 3.4.6 (but will likely behave 
the same with the ZK version included in Kafka distribution) node setup:

# start both zookeeper and kafka (in any order)
# stop zookeeper
# stop kafka
# start kafka
# start zookeeper

h3. Likely Cause

{{ZookeeperLeaderElector}} subscribes to data changes on startup, and then 
triggers an election. if the deletion of ephemeral {{/controller}} node 
associated with previous zookeeper session of the broker happens after 
subscription to changes in new session, election will be invoked twice, once 
from {{startup}} and once from {{handleDataDeleted}}:

* {{startup}}: acquire {{controllerLock}}
* {{startup}}: subscribe to data changes
* {{handleDataDeleted}}: {{/controller}} was deleted
* {{handleDataDeleted}}: wait on {{controllerLock}}
* {{startup}}: elect -- writes {{/controller}}
* {{startup}}: release {{controllerLock}}
* {{handleDataDeleted}}: acquire {{controllerLock}}
* {{handleDataDeleted}}: elect -- attempts to write {{/controller}} and then 
gets into infinite loop as a result of conflict

{{createEphemeralPathExpectConflictHandleZKBug}} assumes that the existing 
znode was written from different session, which is not true in this case; it 
was written from the same session. That adds to the confusion.

h3. Suggested Fix

In {{ZookeeperLeaderElector.startup}} first run {{elect}} and then subscribe to 
data changes.


> Broker stuck due to leader election race 
> -
>
> Key: KAFKA-1451
> URL: https://issues.apache.org/jira/browse/KAFKA-1451
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1.1
>Reporter: Maciek Makowski
>Priority: Minor
>
> h3. Symptoms
> The broker does not become available, due to being stuck in an infinite loop 
> while electing leader. This can be recognised by the following line being 
> repeatedly written to server.log:
> {code}
> [2014-05-14 04:35:09,187] INFO I wrote this conflicted ephemeral node 
> [{"version":1,"brokerid":1,"timestamp":"1400060079108"}] at /controller a 
> while back in a different session, hence I will backoff 

[jira] [Updated] (KAFKA-1450) check invalid leader in a more robust way

2014-05-14 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1450:
---

Attachment: KAFKA-1450.patch

> check invalid leader in a more robust way
> -
>
> Key: KAFKA-1450
> URL: https://issues.apache.org/jira/browse/KAFKA-1450
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Jun Rao
> Attachments: KAFKA-1450.patch
>
>
> In MetadataResponse, we only treat -1 as an invalid leader.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Review Request 21428: Patch for KAFKA-1450

2014-05-14 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21428/
---

Review request for kafka.


Bugs: KAFKA-1450
https://issues.apache.org/jira/browse/KAFKA-1450


Repository: kafka


Description
---

Use Node.isIdValid() instead of testing against -1.


Diffs
-

  clients/src/main/java/org/apache/kafka/common/Node.java 
0e47ff3ff0e055823ec5a5aa4839d25b0fac8374 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
2652c32f123b3bc4b0456d4bc9fbba52c051724c 
  core/src/main/scala/kafka/api/FetchRequest.scala 
a8b73acd1a813284744359e8434cb52d22063c99 
  core/src/main/scala/kafka/api/RequestOrResponse.scala 
57f87a48c5e87220e7f377b23d2bbfa0d16350dc 
  core/src/main/scala/kafka/server/KafkaApis.scala 
0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
4da0f2c245f75ff0dcab4ecf0af085ab9f8da1bb 

Diff: https://reviews.apache.org/r/21428/diff/


Testing
---


Thanks,

Jun Rao



[jira] Subscription: outstanding kafka patches

2014-05-14 Thread jira
Issue Subscription
Filter: outstanding kafka patches (90 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-1445  New Producer should send all partitions that have non-empty batches 
when on of them is ready
https://issues.apache.org/jira/browse/KAFKA-1445
KAFKA-1443  Add delete topic to topic commands and update DeleteTopicCommand
https://issues.apache.org/jira/browse/KAFKA-1443
KAFKA-1437  ConsumerMetadataResponse should always include coordinator 
information
https://issues.apache.org/jira/browse/KAFKA-1437
KAFKA-1431  ConsoleConsumer - Option to clean zk consumer path
https://issues.apache.org/jira/browse/KAFKA-1431
KAFKA-1396  fix transient unit test 
ProducerFailureHandlingTest.testBrokerFailure
https://issues.apache.org/jira/browse/KAFKA-1396
KAFKA-1394  Ensure last segment isn't deleted on expiration when there are 
unflushed messages
https://issues.apache.org/jira/browse/KAFKA-1394
KAFKA-1380  0.8.1.1 release candidate
https://issues.apache.org/jira/browse/KAFKA-1380
KAFKA-1372  Upgrade to Gradle 1.10
https://issues.apache.org/jira/browse/KAFKA-1372
KAFKA-1367  Broker topic metadata not kept in sync with ZooKeeper
https://issues.apache.org/jira/browse/KAFKA-1367
KAFKA-1351  String.format is very expensive in Scala
https://issues.apache.org/jira/browse/KAFKA-1351
KAFKA-1343  Kafka consumer iterator thread stalls
https://issues.apache.org/jira/browse/KAFKA-1343
KAFKA-1324  Debian packaging
https://issues.apache.org/jira/browse/KAFKA-1324
KAFKA-1308  Publish jar of test utilities to Maven
https://issues.apache.org/jira/browse/KAFKA-1308
KAFKA-1303  metadata request in the new producer can be delayed
https://issues.apache.org/jira/browse/KAFKA-1303
KAFKA-1300  Added WaitForReplaction admin tool.
https://issues.apache.org/jira/browse/KAFKA-1300
KAFKA-1235  Enable server to indefinitely retry on controlled shutdown
https://issues.apache.org/jira/browse/KAFKA-1235
KAFKA-1234  All kafka-run-class.sh to source in user config file (to set env 
vars like KAFKA_OPTS)
https://issues.apache.org/jira/browse/KAFKA-1234
KAFKA-1230  shell script files under bin don't work with cygwin (bash on 
windows)
https://issues.apache.org/jira/browse/KAFKA-1230
KAFKA-1215  Rack-Aware replica assignment option
https://issues.apache.org/jira/browse/KAFKA-1215
KAFKA-1207  Launch Kafka from within Apache Mesos
https://issues.apache.org/jira/browse/KAFKA-1207
KAFKA-1206  allow Kafka to start from a resource negotiator system
https://issues.apache.org/jira/browse/KAFKA-1206
KAFKA-1194  The kafka broker cannot delete the old log files after the 
configured time
https://issues.apache.org/jira/browse/KAFKA-1194
KAFKA-1190  create a draw performance graph script
https://issues.apache.org/jira/browse/KAFKA-1190
KAFKA-1180  WhiteList topic filter gets a NullPointerException on complex Regex
https://issues.apache.org/jira/browse/KAFKA-1180
KAFKA-1179  createMessageStreams() in javaapi.ZookeeperConsumerConnector does 
not throw
https://issues.apache.org/jira/browse/KAFKA-1179
KAFKA-1173  Using Vagrant to get up and running with Apache Kafka
https://issues.apache.org/jira/browse/KAFKA-1173
KAFKA-1150  Fetch on a replicated topic does not return as soon as possible
https://issues.apache.org/jira/browse/KAFKA-1150
KAFKA-1147  Consumer socket timeout should be greater than fetch max wait
https://issues.apache.org/jira/browse/KAFKA-1147
KAFKA-1145  Broker fail to sync after restart
https://issues.apache.org/jira/browse/KAFKA-1145
KAFKA-1144  commitOffsets can be passed the offsets to commit
https://issues.apache.org/jira/browse/KAFKA-1144
KAFKA-1130  "log.dirs" is a confusing property name
https://issues.apache.org/jira/browse/KAFKA-1130
KAFKA-1109  Need to fix GC log configuration code, not able to override 
KAFKA_GC_LOG_OPTS
https://issues.apache.org/jira/browse/KAFKA-1109
KAFKA-1106  HighwaterMarkCheckpoint failure puting broker into a bad state
https://issues.apache.org/jira/browse/KAFKA-1106
KAFKA-1093  Log.getOffsetsBefore(t, …) does not return the last confirmed 
offset before t
https://issues.apache.org/jira/browse/KAFKA-1093
KAFKA-1086  Improve GetOffsetShell to find metadata automatically
https://issues.apache.org/jira/browse/KAFKA-1086
KAFKA-1082  zkclient dies after UnknownHostException in zk reconnect
https://issues.apache.org/jira/browse/KAFKA-1082
KAFKA-1049  Encoder implementations are required to provide an undocumented 
constructor.
https://issues.apache.org/jira/browse/KAFKA-1049
KAFKA-1025  Producer.send should provide recoverability inf

[jira] [Comment Edited] (KAFKA-1393) I wrote this conflicted ephemeral node at /brokers/ids/199 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and

2014-05-14 Thread Yongkun Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13996052#comment-13996052
 ] 

Yongkun Wang edited comment on KAFKA-1393 at 5/13/14 4:51 AM:
--

Check whether you have set different broker.id for each broker.


was (Author: yongkun):
Check whether have set different broker.id for each broker.

> I wrote this conflicted ephemeral node  at /brokers/ids/199 a while back in a 
> different session, hence I will backoff for this node to be deleted by 
> Zookeeper and retry
> 
>
> Key: KAFKA-1393
> URL: https://issues.apache.org/jira/browse/KAFKA-1393
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0
>Reporter: zuolin
>Priority: Critical
>
> We're seeing the following log statements (over and over):
> [2014-04-14 14:25:43,304] INFO re-registering broker info in ZK for broker 
> 199 (kafka.server.KafkaZooKeeper)
> [2014-04-14 14:25:47,856] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:53,691] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:53,692] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:54,540] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:25:54,543] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:26:15,997] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:16,063] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:16,116] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,721] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,722] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,738] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:19,767] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:35,794] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:36,009] INFO Closing socket connection to /10.4.59.170. 
> (kafka.network.Processor)
> [2014-04-14 14:26:37,712] INFO Registered broker 199 at path /brokers/ids/199 
> with address 10.4.56.199:9092. (kafka.utils.ZkUtils$)
> [2014-04-14 14:26:37,789] INFO done re-registering broker 
> (kafka.server.KafkaZooKeeper)
> [2014-04-14 14:26:37,947] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaZooKeeper)
> [2014-04-14 14:26:38,179] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:39,158] INFO Closing socket connection to /10.4.56.199. 
> (kafka.network.Processor)
> [2014-04-14 14:26:40,398] INFO 199 successfully elected as leader 
> (kafka.server.ZookeeperLeaderElector)
> [2014-04-14 14:26:43,332] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,332] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,358] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,473] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,701] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,994] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:43,995] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:44,076] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:44,377] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:50,002] INFO Closing socket connection to /10.4.56.246. 
> (kafka.network.Processor)
> [2014-04-14 14:26:50,874] INFO [Replica Manager on Broker 199]: Handling 
> LeaderAndIsr request 
> Name:LeaderAndIsrRequest;Version:0;Controller:199;ControllerEpoch:4;CorrelationId:4;ClientId:id_199-host_10.4.56.199-port_9092;PartitionState:(cacheMonitor-server,0)
>  -> 
> (LeaderAndIsrInfo:(Leader:199,ISR:199,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:199),(socket-server-4,0)
>  -> 
> (LeaderAndIsrInfo:(Leader:199,ISR:199,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:199),(push-server-3,0)
>  -> 
> (LeaderAndIsrInfo:(Lea

Max Message Size

2014-05-14 Thread Bhavesh Mistry
Hi Kafka Team,

Is there any message size limitation from producer side ?  If there, is
what happens to message, does it get truncated or message is lost ?

Thanks,

Bhavesh


Re: How can I step through the Kafka code using a debugger

2014-05-14 Thread Timothy Chen
Hi Sheng Wang,

If you look at the Kafka website there is already a IDE setup page you
can follow.

Once your IDE is setup you can simply run Kafka via the Kafka main
method and set breakpoints in the broker.

Tim

On Tue, May 13, 2014 at 11:44 PM, Sheng Wang  wrote:
> Hi,
>
> I want to learn more about the Kafka code base. One of the easiest ways
> that I can think of is to walk through the code with a debugger. Could
> anyone tell how I can do that? Can I do it using any IDE?
>
> Thanks!
>
> -Sheng