[jira] [Created] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)
Joel Koshy created KAFKA-914:


 Summary: Deadlock between initial rebalance and watcher-triggered 
rebalances
 Key: KAFKA-914
 URL: https://issues.apache.org/jira/browse/KAFKA-914
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Joel Koshy
 Fix For: 0.8


Summary doesn't give the full picture and the fetcher-manager/fetcher-thread


code is very complex so it's a bit hard to articulate the following very


clearly. I will try and describe the sequence that results in a deadlock


when starting up a large number of consumers at around the same time:   





- When a consumer's createMessageStream method is called, it initiates an   


  initial inline rebalance. 


- However, before the above initial rebalance actually begins, a ZK watch   


  may trigger (due to some other consumers starting up) and initiate a  


  rebalance. This happens successfully so fetchers start and start filling  


  up the chunk queues.  


- Another watch triggers and initiates yet another rebalance. This rebalance


  attempt tries to close the fetchers. Before the fetchers are stopped, we  


  shutdown the leader-finder-thread to prevent new fetchers from being  


  started.  


- The shutdown is accomplished by interrupting the leader-finder-thread and 


  then awaiting its shutdown latch. 


- If the leader-finder-thread still has a partition without leader to   


  process and tries to add a fetcher for it, it will get an exception   


  (InterruptedException if acquiring the partitionMapLock or


  ClosedByInterruptException if performing an offset request). If we get an 


  InterruptedException the thread's interrupted flag is cleared.


- However, the leader-finder-thread may have mu

[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663063#comment-13663063
 ] 

Jun Rao commented on KAFKA-901:
---

Thanks for the followup patch. Some comments:

60. KafkaApis: The following logging logs the whole request for each partition. 
This will probably pollute the log. Is it enough just to log the whole request 
once?
if(stateChangeLogger.isTraceEnabled)
  updateMetadataRequest.partitionStateInfos.foreach(p => 
stateChangeLogger.trace(("Broker %d handling " +
"UpdateMetadata request %s correlation id %d received from controller 
%d epoch %d for partition %s")
.format(brokerId, p._2, updateMetadataRequest.correlationId, 
updateMetadataRequest.controllerId,
updateMetadataRequest.controllerEpoch, p._1)))
Is the following logging necessary? If we know a request, we already know what 
should be in the cache after processing the request.
if(stateChangeLogger.isTraceEnabled)
  stateChangeLogger.trace(("Broker %d caching leader info %s for 
partition %s in response to UpdateMetadata request sent by controller %d" +
" epoch %d with correlation id %d").format(brokerId, 
partitionState._2, partitionState._1,
updateMetadataRequest.controllerId, 
updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
  }




> Kafka server can become unavailable if clients send several metadata requests
> -
>
> Key: KAFKA-901
> URL: https://issues.apache.org/jira/browse/KAFKA-901
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
> Attachments: kafka-901-followup.patch, kafka-901.patch, 
> kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, 
> metadata-request-improvement.patch
>
>
> Currently, if a broker is bounced without controlled shutdown and there are 
> several clients talking to the Kafka cluster, each of the clients realize the 
> unavailability of leaders for some partitions. This leads to several metadata 
> requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
> all the I/O threads quickly become busy serving the metadata requests. This 
> leads to a full request queue, that stalls handling of finished responses 
> since the same network thread handles requests as well as responses. In this 
> situation, clients timeout on metadata requests and send more metadata 
> requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-21 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-901:


Attachment: kafka-901-followup2.patch

I agree. Kept only the 2nd logging message about caching the leader info.

Also, both log messages logged only the partition leader info, not the whole 
request. 

Fixed another exception in ControllerChannelManager 

> Kafka server can become unavailable if clients send several metadata requests
> -
>
> Key: KAFKA-901
> URL: https://issues.apache.org/jira/browse/KAFKA-901
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
> Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, 
> kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, 
> metadata-request-improvement.patch
>
>
> Currently, if a broker is bounced without controlled shutdown and there are 
> several clients talking to the Kafka cluster, each of the clients realize the 
> unavailability of leaders for some partitions. This leads to several metadata 
> requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
> all the I/O threads quickly become busy serving the metadata requests. This 
> leads to a full request queue, that stalls handling of finished responses 
> since the same network thread handles requests as well as responses. In this 
> situation, clients timeout on metadata requests and send more metadata 
> requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663093#comment-13663093
 ] 

Jun Rao commented on KAFKA-913:
---

Did you try this on the latest of the 0.8 branch? What OS did you run it on? 
When I ran it, it works and gives me the following. This is actually a required 
step. Otherwise, the scripts under bin won't work.

./sbt assembly-package-dependency 
[info] Loading project definition from 
/Users/jrao/intellij_workspace/kafka_git/project
[info] Set current project to Kafka (in build 
file:/Users/jrao/intellij_workspace/kafka_git/)
[info] Compiling 2 Scala sources to 
/Users/jrao/intellij_workspace/kafka_git/core/target/scala-2.8.0/classes...
[info] Including snappy-java-1.0.4.1.jar
[info] Including zookeeper-3.3.4.jar
[info] Including metrics-core-2.2.0.jar
[info] Including zkclient-0.2.jar
[info] Including metrics-annotation-2.2.0.jar
[info] Including log4j-1.2.15.jar
[info] Including scala-compiler.jar
[info] Including slf4j-api-1.7.2.jar
[info] Including slf4j-simple-1.6.4.jar
[info] Including jopt-simple-3.2.jar
[info] Including scala-library.jar
[warn] Merging 'META-INF/NOTICE' with strategy 'rename'
[warn] Merging 'org/xerial/snappy/native/README' with strategy 'rename'
[warn] Merging 'META-INF/maven/org.xerial.snappy/snappy-java/LICENSE' with 
strategy 'rename'
[warn] Merging 'LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF/LICENSE' with strategy 'rename'
[warn] Merging 'META-INF/MANIFEST.MF' with strategy 'discard'
[warn] Strategy 'discard' was applied to a file
[warn] Strategy 'rename' was applied to 5 files
[success] Total time: 43 s, completed May 21, 2013 9:32:30 AM


> [Documentation] 0.8 Quickstart  mentions sbt assembly-package-dependency - 
> not needed
> -
>
> Key: KAFKA-913
> URL: https://issues.apache.org/jira/browse/KAFKA-913
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8
> Environment: 0.8 Git Revision 731ba90
>Reporter: Martin Eigenbrodt
>
> https://kafka.apache.org/08/quickstart.html says:
> > ./sbt update
> > ./sbt package
> > ./sbt assembly-package-dependency
> but  assembly-package-dependency fails and is actually not needed to run the 
> rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-912.
---

Resolution: Duplicate

> [Documentation] 0.8 Quickstart  mentions sbt assembly-package-dependency - 
> not needed
> -
>
> Key: KAFKA-912
> URL: https://issues.apache.org/jira/browse/KAFKA-912
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8
> Environment: 0.8 Git Revision 731ba90
>Reporter: Martin Eigenbrodt
>
> https://kafka.apache.org/08/quickstart.html says:
> > ./sbt update
> > ./sbt package
> > ./sbt assembly-package-dependency
> but  assembly-package-dependency fails and is actually not needed to run the 
> rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-912) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-912.
-


> [Documentation] 0.8 Quickstart  mentions sbt assembly-package-dependency - 
> not needed
> -
>
> Key: KAFKA-912
> URL: https://issues.apache.org/jira/browse/KAFKA-912
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8
> Environment: 0.8 Git Revision 731ba90
>Reporter: Martin Eigenbrodt
>
> https://kafka.apache.org/08/quickstart.html says:
> > ./sbt update
> > ./sbt package
> > ./sbt assembly-package-dependency
> but  assembly-package-dependency fails and is actually not needed to run the 
> rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663118#comment-13663118
 ] 

Jun Rao commented on KAFKA-901:
---

Thanks for the second followup patch. +1.

> Kafka server can become unavailable if clients send several metadata requests
> -
>
> Key: KAFKA-901
> URL: https://issues.apache.org/jira/browse/KAFKA-901
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
> Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, 
> kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, 
> metadata-request-improvement.patch
>
>
> Currently, if a broker is bounced without controlled shutdown and there are 
> several clients talking to the Kafka cluster, each of the clients realize the 
> unavailability of leaders for some partitions. This leads to several metadata 
> requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
> all the I/O threads quickly become busy serving the metadata requests. This 
> leads to a full request queue, that stalls handling of finished responses 
> since the same network thread handles requests as well as responses. In this 
> situation, clients timeout on metadata requests and send more metadata 
> requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


produce request wire format question

2013-05-21 Thread Dave Peterson
In the version 0.8 wire format for a produce request, does a value of -1
still indicate "use a random partition" as it did for 0.7?

Thanks,
Dave


[jira] [Commented] (KAFKA-901) Kafka server can become unavailable if clients send several metadata requests

2013-05-21 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663131#comment-13663131
 ] 

Neha Narkhede commented on KAFKA-901:
-

Thanks for the quick review, committed it

> Kafka server can become unavailable if clients send several metadata requests
> -
>
> Key: KAFKA-901
> URL: https://issues.apache.org/jira/browse/KAFKA-901
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Blocker
> Attachments: kafka-901-followup2.patch, kafka-901-followup.patch, 
> kafka-901.patch, kafka-901-v2.patch, kafka-901-v4.patch, kafka-901-v5.patch, 
> metadata-request-improvement.patch
>
>
> Currently, if a broker is bounced without controlled shutdown and there are 
> several clients talking to the Kafka cluster, each of the clients realize the 
> unavailability of leaders for some partitions. This leads to several metadata 
> requests sent to the Kafka brokers. Since metadata requests are pretty slow, 
> all the I/O threads quickly become busy serving the metadata requests. This 
> leads to a full request queue, that stalls handling of finished responses 
> since the same network thread handles requests as well as responses. In this 
> situation, clients timeout on metadata requests and send more metadata 
> requests. This quickly makes the Kafka cluster unavailable. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663146#comment-13663146
 ] 

Joel Koshy commented on KAFKA-914:
--

One more point: [td3] above does not need to originate from a watcher-triggered 
rebalance. The initial rebalance can also run into the same deadlock. i.e., as 
long as one or more watcher-triggered rebalances succeed and start fetchers 
prior to the initial rebalance, we may end up in this wedged state. E.g., on 
another instance I saw [td3] but on the main thread:

2013-05-21_17:07:14.34308 "main" prio=10 tid=0x7f5e34008000 nid=0x4e49 
waiting on condition [0x7f5e3b41]
2013-05-21_17:07:14.34308java.lang.Thread.State: WAITING (parking)
2013-05-21_17:07:14.34309   at sun.misc.Unsafe.park(Native Method)
2013-05-21_17:07:14.34309   - parking to wait for  <0x7f5d36d99fa0> (a 
java.util.concurrent.CountDownLatch$Sync)
2013-05-21_17:07:14.34309   at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-21_17:07:14.34310   at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-21_17:07:14.34311   at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-21_17:07:14.34312   at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-21_17:07:14.34313   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-21_17:07:14.34314   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-21_17:07:14.34315   at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-21_17:07:14.34316   at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-21_17:07:14.34316   - locked <0x7f5d36d4b2e0> (a 
java.lang.Object)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-21_17:07:14.34317   at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.(ZookeeperConsumerConnector.scala:712)
2013-05-21_17:07:14.34318   at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34318   at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319   at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.foreach(List.scala:45)
2013-05-21_17:07:14.34320   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-21_17:07:14.34320   at 
scala.collection.immutable.List.map(List.scala:45)
2013-05-21_17:07:14.34321   at 
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-21_17:07:14.34322   at 
kafka.tools.MirrorMaker.main(MirrorMaker.scala)


> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
>

Re: produce request wire format question

2013-05-21 Thread Neha Narkhede
No. In 0.8, if you don't specify a key for a message, it is sent to any of
the available partitions. In other words, the partition id is selected on
the partition and the server doesn't get -1 as the partition id.

Thanks,
Neha


On Tue, May 21, 2013 at 9:54 AM, Dave Peterson wrote:

> In the version 0.8 wire format for a produce request, does a value of -1
> still indicate "use a random partition" as it did for 0.7?
>
> Thanks,
> Dave
>


[jira] [Resolved] (KAFKA-913) [Documentation] 0.8 Quickstart mentions sbt assembly-package-dependency - not needed

2013-05-21 Thread Martin Eigenbrodt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martin Eigenbrodt resolved KAFKA-913.
-

Resolution: Invalid

Indeed. I mixed up the git branches and tried it on "trunk".  on 0.8 it works 
as advertised.
I appologize for bothering you.

> [Documentation] 0.8 Quickstart  mentions sbt assembly-package-dependency - 
> not needed
> -
>
> Key: KAFKA-913
> URL: https://issues.apache.org/jira/browse/KAFKA-913
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Affects Versions: 0.8
> Environment: 0.8 Git Revision 731ba90
>Reporter: Martin Eigenbrodt
>
> https://kafka.apache.org/08/quickstart.html says:
> > ./sbt update
> > ./sbt package
> > ./sbt assembly-package-dependency
> but  assembly-package-dependency fails and is actually not needed to run the 
> rest of the code.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


Re: produce request wire format question

2013-05-21 Thread Dave Peterson
I'm looking at the document entitled "A Guide to the Kafka Protocol"
located here:

https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html

It shows a produce request as containing a number of message sets, which are
grouped first by topic and second by partition (a 32-bit integer).
However, each
message in a message set contains a Key field, which is described as follows:

The key is an optional message key that was used for partition assignment.
The key can be null.

I notice the use of "was" (past tense) above.  That seems to suggest that the
Key field was once used to specify a partition (at the granularity of each
individual message), but the plan for the future is to instead use the 32-bit
partition value preceding each message set.  Is this correct?  If so, when I am
creating a produce request for 0.8, what should I use for the 32-bit partition
value, and how does this relate to the Key field of each individual message?
Ideally, I would like to just send a produce request and let the broker choose
the partition.  How do I accomplish this in 0.8, and are there plans to change
this after 0.8?

Thanks,
Dave

On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede  wrote:
> No. In 0.8, if you don't specify a key for a message, it is sent to any of
> the available partitions. In other words, the partition id is selected on
> the partition and the server doesn't get -1 as the partition id.
>
> Thanks,
> Neha
>
>
> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson wrote:
>
>> In the version 0.8 wire format for a produce request, does a value of -1
>> still indicate "use a random partition" as it did for 0.7?
>>
>> Thanks,
>> Dave
>>


Re: produce request wire format question

2013-05-21 Thread Colin Blower
The key is used by the client to decide which partition to send the
message to. By the time the client is creating the produce request, it
should be known which partition each message is being sent to. I believe
Neha described the behavior of the Java client which sends messages with
a null key to any partition.

The key is described in past tense because of the use case for
persisting keys with messages. The key is persisted through the broker
so that a consumer knows what key was used to partition the message on
the producer side.

I don't believe that you can have the broker decide which partition a
message goes to.

--
Colin B.

On 05/21/2013 11:48 AM, Dave Peterson wrote:
> I'm looking at the document entitled "A Guide to the Kafka Protocol"
> located here:
>
> https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html
>
> It shows a produce request as containing a number of message sets, which are
> grouped first by topic and second by partition (a 32-bit integer).
> However, each
> message in a message set contains a Key field, which is described as follows:
>
> The key is an optional message key that was used for partition assignment.
> The key can be null.
>
> I notice the use of "was" (past tense) above.  That seems to suggest that the
> Key field was once used to specify a partition (at the granularity of each
> individual message), but the plan for the future is to instead use the 32-bit
> partition value preceding each message set.  Is this correct?  If so, when I 
> am
> creating a produce request for 0.8, what should I use for the 32-bit partition
> value, and how does this relate to the Key field of each individual message?
> Ideally, I would like to just send a produce request and let the broker choose
> the partition.  How do I accomplish this in 0.8, and are there plans to change
> this after 0.8?
>
> Thanks,
> Dave
>
> On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede  
> wrote:
>> No. In 0.8, if you don't specify a key for a message, it is sent to any of
>> the available partitions. In other words, the partition id is selected on
>> the partition and the server doesn't get -1 as the partition id.
>>
>> Thanks,
>> Neha
>>
>>
>> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson wrote:
>>
>>> In the version 0.8 wire format for a produce request, does a value of -1
>>> still indicate "use a random partition" as it did for 0.7?
>>>
>>> Thanks,
>>> Dave
>>>




Re: produce request wire format question

2013-05-21 Thread Neha Narkhede
Dave,

Colin described the producer behavior of picking the partition for a
message before it is sent to Kafka broker correctly. However, I'm
interested in knowing your use case a little before to see why you would
rather have the broker decide the partition?

Thanks,
Neha


On Tue, May 21, 2013 at 12:05 PM, Colin Blower wrote:

> The key is used by the client to decide which partition to send the
> message to. By the time the client is creating the produce request, it
> should be known which partition each message is being sent to. I believe
> Neha described the behavior of the Java client which sends messages with
> a null key to any partition.
>
> The key is described in past tense because of the use case for
> persisting keys with messages. The key is persisted through the broker
> so that a consumer knows what key was used to partition the message on
> the producer side.
>
> I don't believe that you can have the broker decide which partition a
> message goes to.
>
> --
> Colin B.
>
> On 05/21/2013 11:48 AM, Dave Peterson wrote:
> > I'm looking at the document entitled "A Guide to the Kafka Protocol"
> > located here:
> >
> > https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html
> >
> > It shows a produce request as containing a number of message sets, which
> are
> > grouped first by topic and second by partition (a 32-bit integer).
> > However, each
> > message in a message set contains a Key field, which is described as
> follows:
> >
> > The key is an optional message key that was used for partition
> assignment.
> > The key can be null.
> >
> > I notice the use of "was" (past tense) above.  That seems to suggest
> that the
> > Key field was once used to specify a partition (at the granularity of
> each
> > individual message), but the plan for the future is to instead use the
> 32-bit
> > partition value preceding each message set.  Is this correct?  If so,
> when I am
> > creating a produce request for 0.8, what should I use for the 32-bit
> partition
> > value, and how does this relate to the Key field of each individual
> message?
> > Ideally, I would like to just send a produce request and let the broker
> choose
> > the partition.  How do I accomplish this in 0.8, and are there plans to
> change
> > this after 0.8?
> >
> > Thanks,
> > Dave
> >
> > On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede 
> wrote:
> >> No. In 0.8, if you don't specify a key for a message, it is sent to any
> of
> >> the available partitions. In other words, the partition id is selected
> on
> >> the partition and the server doesn't get -1 as the partition id.
> >>
> >> Thanks,
> >> Neha
> >>
> >>
> >> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson  >wrote:
> >>
> >>> In the version 0.8 wire format for a produce request, does a value of
> -1
> >>> still indicate "use a random partition" as it did for 0.7?
> >>>
> >>> Thanks,
> >>> Dave
> >>>
>
>
>


Re: produce request wire format question

2013-05-21 Thread Dave Peterson
In my case, there is a load balancer between the producers and the
brokers, so I want the behavior described for the Java client (null key
specifies "any partition").  If the Key field of each individual message
specifies the partition to send it to, then I don't understand the purpose
of the 32-bit partition identifier that precedes each message set in a
produce request: what if a produce request specifies "partition N" for a
given message set, and then each individual message in the set
specifies a different partition in its Key field?  Also, the above-
mentioned partition identifier is a 32-bit integer and the Key field of
each individual message can contain data of arbitrary length, which
seems inconsistent.  Is a partition identifier a 32-bit integer, or can it
be of arbitrary length?

Thanks,
Dave

On Tue, May 21, 2013 at 12:30 PM, Neha Narkhede  wrote:
> Dave,
>
> Colin described the producer behavior of picking the partition for a
> message before it is sent to Kafka broker correctly. However, I'm
> interested in knowing your use case a little before to see why you would
> rather have the broker decide the partition?
>
> Thanks,
> Neha
>
>
> On Tue, May 21, 2013 at 12:05 PM, Colin Blower wrote:
>
>> The key is used by the client to decide which partition to send the
>> message to. By the time the client is creating the produce request, it
>> should be known which partition each message is being sent to. I believe
>> Neha described the behavior of the Java client which sends messages with
>> a null key to any partition.
>>
>> The key is described in past tense because of the use case for
>> persisting keys with messages. The key is persisted through the broker
>> so that a consumer knows what key was used to partition the message on
>> the producer side.
>>
>> I don't believe that you can have the broker decide which partition a
>> message goes to.
>>
>> --
>> Colin B.
>>
>> On 05/21/2013 11:48 AM, Dave Peterson wrote:
>> > I'm looking at the document entitled "A Guide to the Kafka Protocol"
>> > located here:
>> >
>> > https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html
>> >
>> > It shows a produce request as containing a number of message sets, which
>> are
>> > grouped first by topic and second by partition (a 32-bit integer).
>> > However, each
>> > message in a message set contains a Key field, which is described as
>> follows:
>> >
>> > The key is an optional message key that was used for partition
>> assignment.
>> > The key can be null.
>> >
>> > I notice the use of "was" (past tense) above.  That seems to suggest
>> that the
>> > Key field was once used to specify a partition (at the granularity of
>> each
>> > individual message), but the plan for the future is to instead use the
>> 32-bit
>> > partition value preceding each message set.  Is this correct?  If so,
>> when I am
>> > creating a produce request for 0.8, what should I use for the 32-bit
>> partition
>> > value, and how does this relate to the Key field of each individual
>> message?
>> > Ideally, I would like to just send a produce request and let the broker
>> choose
>> > the partition.  How do I accomplish this in 0.8, and are there plans to
>> change
>> > this after 0.8?
>> >
>> > Thanks,
>> > Dave
>> >
>> > On Tue, May 21, 2013 at 10:47 AM, Neha Narkhede 
>> wrote:
>> >> No. In 0.8, if you don't specify a key for a message, it is sent to any
>> of
>> >> the available partitions. In other words, the partition id is selected
>> on
>> >> the partition and the server doesn't get -1 as the partition id.
>> >>
>> >> Thanks,
>> >> Neha
>> >>
>> >>
>> >> On Tue, May 21, 2013 at 9:54 AM, Dave Peterson > >wrote:
>> >>
>> >>> In the version 0.8 wire format for a produce request, does a value of
>> -1
>> >>> still indicate "use a random partition" as it did for 0.7?
>> >>>
>> >>> Thanks,
>> >>> Dave
>> >>>
>>
>>
>>


[jira] [Updated] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-914:
-

Attachment: KAFKA-914-v1.patch

Patch with the mentioned fix.

1 - I added comments with some detail since the manager/fetcher/connector 
interaction is very tricky.
2 - Passing through throwables while shutting down. The isRunning check is 
probably unnecessary, but safer to keep.
3 - Made the following changes to the mirrormaker - I can put that in a 
separate jira as well.
  a - Currently if no streams are created, the mirrormaker doesn't quit. 
Setting streams to empty/nil fixes that issue.
  b - If a consumer-side exception (e.g., iterator timeout) gets thrown the 
mirror-maker does not exit. Addressed this by awaiting on the consumer threads 
at the end of the main method.



> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-914-v1.patch
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
>   
> 
> code is very complex so it's a bit hard to articulate the following very  
>   
> 
> clearly. I will try and describe the sequence that results in a deadlock  
>   
> 
> when starting up a large number of consumers at around the same time: 
>   
> 
>   
>   
> 
> - When a consumer's createMessageStream method is called, it initiates an 
>   
> 
>   initial inline rebalance.   
>   
> 
> - However, before the above initial rebalance actually begins, a ZK watch 
>   
> 
>   may trigger (due to some other consumers starting up) and initiate a
>   
> 
>   rebalance. This happens successfully so fetchers start and start filling
>   
> 
>   up the chunk queues.
>   
> 
> - Another watch triggers and initiates yet another rebalance. This rebalance  
>   
> 
>   attempt tries to close the fetchers. Before the fetchers are stopped, we
>   
> 
>   shutdown the leader-finder-thread to prevent new fetchers from being
>   
> 
>   started.
>   
> 
> - The shutdown is accomplished by interrupting the leader-finder-thread and   
>   
> 
>   then awaiting its shutdown latch.   
>   
> 
> - If the leader-finder-thread still has

[jira] [Commented] (KAFKA-914) Deadlock between initial rebalance and watcher-triggered rebalances

2013-05-21 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663605#comment-13663605
 ] 

Jun Rao commented on KAFKA-914:
---

Thanks for the patch. Looks good. +1. One minor comment: The following 
statement in the catch clause in MirrorMaker is unnecessary.
streams = Nil


> Deadlock between initial rebalance and watcher-triggered rebalances
> ---
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Joel Koshy
> Fix For: 0.8
>
> Attachments: KAFKA-914-v1.patch
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
>   
> 
> code is very complex so it's a bit hard to articulate the following very  
>   
> 
> clearly. I will try and describe the sequence that results in a deadlock  
>   
> 
> when starting up a large number of consumers at around the same time: 
>   
> 
>   
>   
> 
> - When a consumer's createMessageStream method is called, it initiates an 
>   
> 
>   initial inline rebalance.   
>   
> 
> - However, before the above initial rebalance actually begins, a ZK watch 
>   
> 
>   may trigger (due to some other consumers starting up) and initiate a
>   
> 
>   rebalance. This happens successfully so fetchers start and start filling
>   
> 
>   up the chunk queues.
>   
> 
> - Another watch triggers and initiates yet another rebalance. This rebalance  
>   
> 
>   attempt tries to close the fetchers. Before the fetchers are stopped, we
>   
> 
>   shutdown the leader-finder-thread to prevent new fetchers from being
>   
> 
>   started.
>   
> 
> - The shutdown is accomplished by interrupting the leader-finder-thread and   
>   
> 
>   then awaiting its shutdown latch.   
>   
> 
> - If the leader-finder-thread still has a partition without leader to 
>   
> 
>   process and tries to add a fetcher for it, it will get an exception 
>   
> 
>   (InterruptedException if acquiring the partitionMapLock or  
>