[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-16 Thread Rekha Joshi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497644#comment-14497644
 ] 

Rekha Joshi commented on KAFKA-2098:


Thanks for a prompt reply [~jkreps].What works for you works for me :-) But 
just for my curiosity, can you please share link to this Apache requirement? As 
Samza is also a top level Apache project and has 
https://github.com/apache/samza/tree/master/gradle/wrapper 
Thanks!

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2125) Infinite loop after controlled shutdown succeeds

2015-04-16 Thread Jason Rosenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497616#comment-14497616
 ] 

Jason Rosenberg commented on KAFKA-2125:


So, I'd think the exception there before the 'NoSuchElementException' seems 
relevant (since it is complaining about the 1 partition that is then complained 
about repeatedly following

 Infinite loop after controlled shutdown succeeds
 

 Key: KAFKA-2125
 URL: https://issues.apache.org/jira/browse/KAFKA-2125
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Jason Rosenberg
Priority: Blocker
 Attachments: grep_shut_edited.log


 I have a 4 node cluster, running 0.8.2.1, that got into a bad state last 
 night during a rolling restart.  The first node to be restarted was the 
 controller.  Controlled Shutdown completed successfully, after about 800ms.  
 But after that, the server failed to shutdown, and got into what appears to 
 be an infinite cycle, involving the 'Controller-45-to-broker-45-send-thread', 
 and the 'kafka-scheduler-0' thread.  Ultimately, the shutdown timed out 
 (after 3 minutes) and it was terminated by the deployment system, and the 
 server was restarted.  As expected, when it came back up it took some time 
 re-syncing it's partitions, but eventually came back and all was well.  
 However, I think there was something fundamentally wrong with the shutdown 
 process.  The controller didn't seem to give up controller status, for one 
 thing, as part of the controlled shutdown (which I should think would be the 
 first thing it should do?).
 Anyway, below are some log snippets. I do have full logs from each broker in 
 the cluster, which I can provide (but would have to significantly anonymize 
 the logs before forwarding along).
 Controlled shutdown starts and succeeds:
 {code}
 2015-04-14 05:56:10,134  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], shutting down
 2015-04-14 05:56:10,136  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Starting controlled shutdown
 2015-04-14 05:56:10,150  INFO [kafka-request-handler-0] 
 controller.KafkaController - [Controller 45]: Shutting down broker 45
 ...
 ...
 2015-04-14 05:56:10,951  INFO [Thread-38] server.KafkaServer - [Kafka Server 
 45], Controlled shutdown succeeded
 {code}
 Subsequently, the 'Controller-45-to-broker-45-send-thread' starts repeatedly 
 spamming the logs, like so:
 {code}
 2015-04-14 05:56:11,281  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,281  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to id:45,host:broker45.square,port:12345 for sending 
 state change requests
 2015-04-14 05:56:11,582  WARN [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 epoch 21 fails to send request 
 Name:LeaderAndIsrRequest;Version:0;Controller:45;ControllerEpoch:21;CorrelationId:305175;ClientId:id_45-host_null-port_12345;Leaders:id:48,host:broker48.square,port:12345;PartitionState:(mytopic,1)
  - 
 (LeaderAndIsrInfo:(Leader:48,ISR:48,LeaderEpoch:5,ControllerEpoch:21),ReplicationFactor:2),AllReplicas:45,48)
  to broker id:45,host:broker45.square,port:12345. Reconnecting to broker.
 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 2015-04-14 05:56:11,582  INFO [Controller-45-to-broker-45-send-thread] 
 controller.RequestSendThread - [Controller-45-to-broker-45-send-thread], 
 Controller 45 connected to 

[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14497680#comment-14497680
 ] 

Ewen Cheslack-Postava commented on KAFKA-2098:
--

The original issue is here: https://issues.apache.org/jira/browse/KAFKA-1490 
The first post of the mailing list thread linked to in that post gives a good 
rundown of the issues: 
http://mail-archives.apache.org/mod_mbox/incubator-general/201406.mbox/%3CCADiKvVs%3DtKDbp3TWRnxds5dVepqcX4kWeYbj7xUx%2BZoDNM_Lyg%40mail.gmail.com%3E
 Sadly, the deeper you dig, the more confusing things get. export vs 
checkout is hinted at here 
http://incubator.apache.org/guides/releasemanagement.html#best-practice-source 
which would indicate you can include binaries in the repo as long as they 
aren't included in the release. 
https://www.apache.org/dev/release-publishing.html#valid talks about all source 
code being appropriately licensed and under the right CLA, the latter 
potentially being a problem for an Apache source release since gradle wouldn't 
be covered by an Apache CLA.

As a side note, downstream packagers sometimes dislike any binaries in source 
releases. See, e.g., 
https://www.debian.org/doc/packaging-manuals/java-policy/x84.html which 
indicates debian maintainers regularly need to repack source releases to remove 
the binary bits. Making sure they don't make them into source releases makes 
things a lot less painful for packagers.

If you look into the source release for Samza, you'll see it doesn't contain 
any jars. They're stripped out during the release process (I haven't checked 
how, but just run find on the source tgz contents -- you won't find any jars). 
This matches the export vs. checkout distinction. It also addresses the 
potential CLA issues with a source release. I'm guessing that, given the 
complaints about the missing wrapper, this might be the right thing to do to 
appease Apache, downstream packagers, and developers/users.



Personally, I'm surprised this causes as much frustration as it does -- the 
only time it's been an issue for me is on platforms where the native gradle 
packages are too old. But 

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-16 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated KAFKA-2128:
---
Status: Open  (was: Patch Available)

 kafka.Kafka should return non-zero exit code when caught exception.
 ---

 Key: KAFKA-2128
 URL: https://issues.apache.org/jira/browse/KAFKA-2128
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 0.8.3


 kafka.Kafka Object always return exit code zero.
 I think that it should return non-zero exit code when caught exception.
 (for example FileNotFoundException caught, since server.properies is not 
 exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-16 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated KAFKA-2128:
---
Status: Patch Available  (was: Open)

 kafka.Kafka should return non-zero exit code when caught exception.
 ---

 Key: KAFKA-2128
 URL: https://issues.apache.org/jira/browse/KAFKA-2128
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2128-1.patch


 kafka.Kafka Object always return exit code zero.
 I think that it should return non-zero exit code when caught exception.
 (for example FileNotFoundException caught, since server.properies is not 
 exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-16 Thread Guozhang Wang
Hi Adi,

2. I assume you were saying than strictly needed for replications here?

Also the concern I have is around error code: today if the replication is
not finished within in the replication timeout then the error code will be
set accordingly when it returns. Let's say if the produce request is not
satisfied after X (replication timeout) ms, but is satisfied after Y
(throttling timeout), should we still set the error code or not? I think it
is OK to just set NO_ERROR but we need to document such cases clearly for
quote actions mixed with ack = -1.

Guozhang

On Wed, Apr 15, 2015 at 4:23 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Thanks for the review Guozhang.

 1. Agreed.

 2. This proposal actually waits for the maximum of the 2 timeouts. This
 reduces implementation complexity at the cost of waiting longer than
 strictly needed for quotas. Note that this is only for the case where
 acks=-1.

 However we can solve this if it is a significant concern by adding watcher
 keys for all partitions (only if acks=-1). These are the keys we would
 normally add while waiting for acknowledgements. We can change the
 tryComplete() function to return false until 'quota_timeout' time has
 elapsed AND all the acknowledgements have been received.

 Thanks,
 Aditya
 
 From: Guozhang Wang [wangg...@gmail.com]
 Sent: Wednesday, April 15, 2015 3:42 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 Thanks for the summary. A few comments below:

 1. Say a produce request has replication timeout X, and upon finishing the
 local append it is determined to be throttled Y ms where Y  X, then after
 it has timed out in the purgatory after Y ms we should still check if the
 #.acks has fulfilled in order to set the correct error codes in the
 response.

 2. I think it is actually common that the calculated throttle time Y is
 less than the replication timeout X, which will be a tricky case since we
 need to make sure 1) at least the request it held in the purgatory for Y
 ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set
 no-error-code and return immediately, 3) after X ms elapsed, set
 timeout-error-code and return.

 Guozhang

 On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  This is an implementation proposal for delaying requests in quotas using
  the current purgatory. I'll discuss the usage for produce and fetch
  requests separately.
 
  1. Delayed Produce Requests - Here, the proposal is basically to reuse
  DelayedProduce objects and insert them into the purgatory with no watcher
  keys if the request is being throttled. The timeout used in the request
  should be the Max(quota_delay_time, replication_timeout).
  In most cases, the quota timeout should be greater than the existing
  timeout but in order to be safe, we can use the maximum of these values.
  Having no watch keys will allow the operation to be enqueued directly
 into
  the timer and will not add any overhead in terms of watching keys (which
  was a concern). In this case, having watch keys is not beneficial since
 the
  operation must be delayed for a fixed amount of time and there is no
  possibility for the operation to complete before the timeout i.e.
  tryComplete() can never return true before the timeout. On timeout, since
  the operation is a TimerTask, the timer will call run() which calls
  onComplete().
  In onComplete, the DelayedProduce can repeat the check in tryComplete()
  (only if acks=-1 whether all replicas fetched upto a certain offset) and
  return the response immediately.
 
  Code will be structured as follows in ReplicaManager:appendMessages()
 
  if(isThrottled) {
fetch = new DelayedProduce(timeout)
purgatory.tryCompleteElseWatch(fetch, Seq())
  }
  else if(delayedRequestRequired()) {
   // Insert into purgatory with watched keys for unthrottled requests
  }
 
  In this proposal, we avoid adding unnecessary watches because there is no
  possibility of early completion and this avoids any potential performance
  penalties we were concerned about earlier.
 
  2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the
  DelayedFetch objects and insert them into the purgatory with no watcher
  keys if the request is throttled. Timeout used is the
 Max(quota_delay_time,
  max_wait_timeout). Having no watch keys provides the same benefits as
  described above. Upon timeout, the onComplete() is called and the
 operation
  proceeds normally i.e. perform a readFromLocalLog and return a response.
  The caveat here is that if the request is throttled but the throttle time
  is less than the max_wait timeout on the fetch request, the request will
 be
  delayed to a Max(quota_delay_time, max_wait_timeout). This may be more
 than
  strictly necessary (since we are not watching for
  satisfaction on any keys).
 
  I added some testcases to DelayedOperationTest to verify that 

Re: Review Request 33071: Dynamically load JRE-specific class in TestPurgatoryPerformance

2015-04-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33071/#review80342
---


Cool! It looks good to me.

- Yasuhiro Matsuda


On April 10, 2015, 8:45 a.m., Rajini Sivaram wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33071/
 ---
 
 (Updated April 10, 2015, 8:45 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2113
 https://issues.apache.org/jira/browse/KAFKA-2113
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Patch for KAFKA-2113: Dynamically load JRE-specific class in 
 TestPurgatoryPerformance
 
 
 Diffs
 -
 
   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala 
 962253a7260c90233d4a4c4fe8c75af211453f2a 
 
 Diff: https://reviews.apache.org/r/33071/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Rajini Sivaram
 




[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-16 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498282#comment-14498282
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

Updated reviewboard https://reviews.apache.org/r/33242/diff/
 against branch apache/trunk

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, 
 KAFKA-2121_2015-04-16_09:55:14.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix the issue.
  
   Thanks,
   Steven
  
 

[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-16 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: KAFKA-2121_2015-04-16_09:55:14.patch

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, 
 KAFKA-2121_2015-04-16_09:55:14.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 
 --
 -- Guozhang



--
This message was sent by Atlassian JIRA

Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 16, 2015, 4:55 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

add a unit test file


changes based on Ewen's review feedbacks


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 16, 2015, 5:03 p.m.)


Review request for kafka.


Changes
---

address Ewen's review feedbacks

I'm getting a bunch of checkstyle complaints when I try to test. These should 
all be easy to fix (and should be causing tests to fail before even running). 
The only rule that might not be obvious from the error message is that the 
static final field in MockMetricsReporter is expected to be all-caps since it 
looks like a constant to checkstyle.
 fixed

In the constructor, could we throw some subclass of KafkaException instead? The 
new clients try to stick to that exception hierarchy except in a few special 
cases. Alternatively, maybe if we caught Error and RuntimeException instead of 
Throwable then we could just rethrow the same exception?
 I changed RuntimeException to KafkaException. can't think of a good subclass 
 name for this scenario. ProducerConstructException? hence, stay with the 
 generic KafkaException

The new version of close() will swallow exceptions when called normally (i.e. 
not from the constructor). They'll be logged, but the caller won't see the 
exception anymore. Maybe we should save the first exception and rethrow it?
 refactored a private close(boolean swallowException) method

Exception messages should be capitalized.
 fixed

In the test, we should probably have an assert outside the catch. And is there 
any reason the closeCount is being reset to 0?
 yes. we should have an assert outside the catch
 I was just reset the CLOSE_COUNT in case another test method need to check 
 the count.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

fix potential resource leak when KafkaProducer throws exception in the middle


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-04-16 Thread Joe Stein
1. agreed

2. agree new error

3. having discrete operations for tasks makes sense, combining them is
confusing for users I think. + 1 for let user change only one thing at a
time

4. lets be consistent both to the new code and existing code. lets not
confuse the user but give them the right error information so they know
what they did wrong without much fuss.

~ Joe Stein
- - - - - - - - - - - - - - - - -

  http://www.stealth.ly
- - - - - - - - - - - - - - - - -

On Wed, Apr 15, 2015 at 1:23 PM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Guys,

 Thanks for the discussion!

 Summary:

 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can
 affect implementation?
 A: We can fix this issue for the leading broker - ReplicaManager (or
 Partition)
 component should have accurate isr list, then with leading broker
 having correct
 info, to do a describe-topic we will need to define leading brokers
 for partitions
 and ask those for a correct isr list.
 Also, we should consider adding lag information to TMR for each
 follower for
 partition reassignment, as Jun suggested above.

 2. Q: What if user adds different alter commands for the same topic in
 scope
  of one batch request?
 A: Because of the async nature of AlterTopicRequest it will be very
 hard then
 to assemble the expected (in terms of checking whether request is
 complete)
 result if we let users do this. Also it will be very confusing. It
 was proposed not to
 let users do this (probably add new Error for such cases).

 3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and
 ReassingPartitons in which order AlterTopic fields should be
 resolved?
 A: This item is not clear. There was a proposal to let user change only
 one thing at a time, e.g. specify either new Replicas, or
 ReplicaAssignment.
 This can be a simple solution, but it's a very strict rule. E.g.
 currently with
 TopicCommand user can increase nr of partitions and define replica
 assignment
 for newly added partitions. Taking into account item 2. this will
 be even harder
 for user to achieve this.

 4. Q: Do we need such accurate errors returned from the server:
 InvalidArgumentPartitions,
  InvalidArgumentReplicas etc.
 A: I started implementation to add proposed error codes and now I think
 probably
 InvalidArgumentError should be sufficient. We can do simple
 validations on
 the client side (e.g. AdminClient can ensure nr of partitions
 argument is positive),
 others - which can be covered only on server (probably invalid
 topic config,
 replica assignment includes dead broker etc) - will be done on
 server, and in case
 of invalid argument we will return InvalidArgumentError without
 specifying the
 concrete field.

 It'd be great if we could cover these remaining issues, looks like they are
 minor,
 at least related to specific messages, not the overall protocol. - I think
 with that I can
 update confluence page and update patch to reflect all discussed items.
 This patch
 will probably include Wire protocol messages and server-side code to handle
 new
 requests. AdminClient and cli-tool implementation can be the next step.

 Thanks,
 Andrii Biletskyi

 On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao j...@confluent.io wrote:

  Andrii,
 
  500. I think what you suggested also sounds reasonable. Since ISR is only
  maintained accurately at the leader, TMR can return ISR if the broker is
  the leader of a partition. Otherwise, we can return an empty ISR. For
  partition reassignment, it would be useful to know the lag of each
  follower. Again, the leader knows this info. We can probably include that
  info in TMR as well.
 
  300. I think it's probably reasonable to restrict AlterTopicRequest to
  change only one thing at a time, i.e., either partitions, replicas,
 replica
  assignment or config.
 
  Thanks,
 
  Jun
 
  On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Jun,
  
   404. Great, thanks!
  
   500. If I understand correctly KAFKA-1367 says ISR part of TMR may
   be inconsistent. If so, then I believe all admin commands but
  describeTopic
   are not affected. Let me emphasize that it's about AdminClient
  operations,
   not about Wire Protocol requests. What I mean:
   To verify AdminClient.createTopic we will need (consistent) 'topics'
 set
   from TMR (we don't need isr)
   To verify alterTopic - again, probably 'topics' and 'assigned
 replicas' +
   configs
   To verify deleteTopic - only 'topics'
   To verify preferredReplica - 'leader', 'assigned replicas'
   To verify reassignPartitions - 'assigned replicas' ? (I'm not sure
 about
   this one)
   If everything above is correct, then AdminClient.describeTopic is the
  only
   command under risk. We can actually workaround it - find out 

[DISCUSSION] KIP-11: ACL Management

2015-04-16 Thread Gwen Shapira
Hi Kafka Authorization Fans,

I'm starting a new thread on a specific sub-topic of KIP-11, since
this is a bit long :)

Currently KIP-11, as I understand it, proposes:
* Authorizers are pluggable, with Kafka providing DefaultAuthorizer.
* Kafka tools allow adding / managing ACLs.
* Those ACLs are stored in ZK and cached in a new TopicCache
* Authorizers can either use the ACLs defined and stored in Kafka, or
define and use their own.

I am concerned of two possible issues with this design:
1. Separation of concerns - only authorizers should worry about ACLs,
and therefore the less code for ACLs that exist in Kafka core, the
better.
2. User confusion - It sounded like we can define ACLs in Kafka itself
but authorizers can also define their own, so kafka-topics
--describe may show an ACL different than the one in use. This can be
super confusing for admins.

My alternative suggestion:
* Authorizer API will include:
 grantPrivilege(ListPrincipals, ListPrivilege)
 revokePrivilege(ListPrincipals, ListPrivilege),
 getPrivilegesByPrincipal(Principal, Resource)
 
 (The exact API can be discussed in detail, but you get the idea)
* Kafka tools will simply invoke these APIs when topics are added /
modified / described.
* Each authorizer (including the default one) will be responsible for
storing, caching and using those ACLs.

This way, we keep almost all ACL code with the Authorizer, where it
belongs and users get a nice unified interface that reflects what is
actually getting used in the system.
This is pretty much how Sqoop and Hive implement their authorization APIs.

What do you think?

Gwen


[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-16 Thread alexcb (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498307#comment-14498307
 ] 

alexcb commented on KAFKA-2098:
---

Thanks for the notes everyone.

gradlew (the gradle wrapper) is designed to allow users who do not have gradle 
setup to build the project. If you require gradle to be setup ahead of time, 
then it seems like one could just use gradle directly.

For example:

{noformat}
[root@feb6842638b9 kafkatest]# tar xzf kafka-0.8.2.1-src.tgz 
[root@feb6842638b9 kafkatest]# cd kafka-0.8.2.1-src
[root@feb6842638b9 kafka-0.8.2.1-src]# rm gradlew gradlew.bat   *** me 
simulating removing these files from the release
[root@feb6842638b9 kafka-0.8.2.1-src]# gradle jar

... output removed...

:core:processResources UP-TO-DATE
:core:classes
:core:copyDependantLibs
:core:jar
:examples:compileJava
:examples:processResources UP-TO-DATE
:examples:classes
:examples:jar
:contrib:hadoop-consumer:compileJava
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:contrib:hadoop-consumer:processResources UP-TO-DATE
:contrib:hadoop-consumer:classes
:contrib:hadoop-consumer:jar
:contrib:hadoop-producer:compileJava
:contrib:hadoop-producer:processResources UP-TO-DATE
:contrib:hadoop-producer:classes
:contrib:hadoop-producer:jar

BUILD SUCCESSFUL
{noformat}

By using this directly, we 1) avoid the confusion of distributing a non-working 
gradle wrapper, and 2) bypass the wrapper and just use gradle directly.

Thanks.

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-16 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated KAFKA-2128:
---
Attachment: KAFKA-2128-1.patch

Upload patch to improve this problem.

 kafka.Kafka should return non-zero exit code when caught exception.
 ---

 Key: KAFKA-2128
 URL: https://issues.apache.org/jira/browse/KAFKA-2128
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2128-1.patch


 kafka.Kafka Object always return exit code zero.
 I think that it should return non-zero exit code when caught exception.
 (for example FileNotFoundException caught, since server.properies is not 
 exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-16 Thread Sasaki Toru (JIRA)
Sasaki Toru created KAFKA-2128:
--

 Summary: kafka.Kafka should return non-zero exit code when caught 
exception.
 Key: KAFKA-2128
 URL: https://issues.apache.org/jira/browse/KAFKA-2128
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 0.8.3


kafka.Kafka Object always return exit code zero.
I think that it should return non-zero exit code when caught exception.
(for example FileNotFoundException caught, since server.properies is not exist)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2128) kafka.Kafka should return non-zero exit code when caught exception.

2015-04-16 Thread Sasaki Toru (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sasaki Toru updated KAFKA-2128:
---
Status: Patch Available  (was: Open)

 kafka.Kafka should return non-zero exit code when caught exception.
 ---

 Key: KAFKA-2128
 URL: https://issues.apache.org/jira/browse/KAFKA-2128
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.3
Reporter: Sasaki Toru
Priority: Minor
 Fix For: 0.8.3


 kafka.Kafka Object always return exit code zero.
 I think that it should return non-zero exit code when caught exception.
 (for example FileNotFoundException caught, since server.properies is not 
 exist)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/
---

(Updated April 16, 2015, 5:44 p.m.)


Review request for kafka.


Bugs: KAFKA-2121
https://issues.apache.org/jira/browse/KAFKA-2121


Repository: kafka


Description (updated)
---

add a unit test file


changes based on Ewen's review feedbacks


fix capitalization in error log


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java 
PRE-CREATION 

Diff: https://reviews.apache.org/r/33242/diff/


Testing
---


Thanks,

Steven Wu



Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-16 Thread Jiangjie Qin
Hi Harsha,

Took a quick look at the patch. I think it is still a little bit
different. KAFKA-1788 only handles the case where a batch sitting in
accumulator for too long. The KIP is trying to solve the issue where a
batch has already been drained from accumulator and sent to broker.
We might be able to apply timeout on batch level to merge those two cases
as Ewen suggested. But I’m not sure if it is a good idea to allow messages
whose target partition is offline to sit in accumulator in the first place.

Jiangjie (Becket) Qin

On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote:

Guozhang and Jiangjie,
 Isn’t this work being covered in
https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
review the patch there.
Thanks,
Harsha


On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
wrote:

Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the
request  
timeout as implict timeout. I am not very clear what does this mean?

2. Currently the producer already has a TIMEOUT_CONFIG which should
really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
 
it will change the config names but will reduce confusions moving
forward.  


Guozhang  


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
 
wrote:  

 Checked the code again. It seems that the disconnected channel is not
 detected by selector as expected.
  
 Currently we are depending on the
 o.a.k.common.network.Selector.disconnected set to see if we need to do
 something for a disconnected channel.
 However Selector.disconnected set is only updated when:
 1. A write/read/connect to channel failed.
 2. A Key is canceled
 However when a broker is down before it sends back the response, the
 client seems not be able to detect this failure.
  
 I did a simple test below:
 1. Run a selector on one machine and an echo server on another machine.
 
 Connect a selector to an echo server
 2. Send a message to echo server using selector, then let the selector
 poll() every 10 seconds.
 3. After the sever received the message, unplug cable on the echo
server.  
 4. After waiting for 45 min. The selector still did not detected the
 network failure.
 Lsof on selector machine shows that the TCP connection is still
considered  
 ESTABLISHED.  
  
 I’m not sure in this case what should we expect from the
 java.nio.channels.Selector. According to the document, the selector
does  
 not verify the status of the associated channel. In my test case it
looks  
 even worse that OS did not think of the socket has been disconnected.
  
 Anyway. It seems adding the client side request timeout is necessary.
I’ve  
 updated the KIP page to clarify the problem we want to solve according
to  
 Ewen’s comments.
  
 Thanks.  
  
 Jiangjie (Becket) Qin
  
 On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:
 
  
 On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:  
   
  Hi Ewen, thanks for the comments. Very good points! Please see
replies  
  inline.  
   
   
  On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:  
   
  Jiangjie,  

  Great start. I have a couple of comments.

  Under the motivation section, is it really true that the request
will  
  never  
  be completed? Presumably if the broker goes down the connection
will be  
  severed, at worst by a TCP timeout, which should clean up the
 connection  
  and any outstanding requests, right? I think the real reason we
need a  
  different timeout is that the default TCP timeouts are ridiculously
 
 long  
  in  
  this context.
  Yes, when broker is completely down the request should be cleared as
you  
  said. The case we encountered looks like the broker was just not
  responding but TCP connection was still alive though.
   
   
 Ok, that makes sense.
   
   
   

  My second question is about whether this is the right level to
tackle  
 the  
  issue/what user-facing changes need to be made. A related problem
came  
 up  
  in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
  records  
  get stuck indefinitely because there's no client-side timeout. This
KIP  
  wouldn't fix that problem or any problems caused by lack of
 connectivity  
  since this would only apply to in flight requests, which by
definition  
  must  
  have been sent on an active connection.

  I suspect both types of problems probably need to be addressed
 separately  
  by introducing explicit timeouts. However, because the settings
 introduced  
  here are very much about the internal implementations of the
clients,  
 I'm  
  wondering if this even 

[jira] [Updated] (KAFKA-2113) TestPurgatoryPerformance does not compile using IBM JDK

2015-04-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2113:
---
   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1 and committed to trunk.

 TestPurgatoryPerformance does not compile using IBM JDK
 ---

 Key: KAFKA-2113
 URL: https://issues.apache.org/jira/browse/KAFKA-2113
 Project: Kafka
  Issue Type: Bug
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 0.8.3

 Attachments: KAFKA-2113.patch


 TestPurgatoryPerformance uses a class from the com.sun package that is not 
 available in the IBM JDK.
 The code does handle the class not found exception at runtime if run with an 
 IBM JRE (and prints out -1 as CPU time). But as a result of the direct 
 reference to the com.sun.management.OperatingSystemMXBean class, Kafka core 
 project no longer compiles with an IBM JDK. 
 {quote}
 :core:compileTestScala/kafka/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala:88:
  type OperatingSystemMXBean is not a member of package com.sun.management
   
 Some(ManagementFactory.getOperatingSystemMXBean().asInstanceOf[com.sun.management.OperatingSystemMXBean])
   
   ^
 one error found
  FAILED
 {quote}
 The JRE-specific class should be dynamically loaded to enable the class to be 
 compiled as well as run with any JDK.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2120) Add a request timeout to NetworkClient

2015-04-16 Thread Suresh Srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498340#comment-14498340
 ] 

Suresh Srinivas commented on KAFKA-2120:


KAFKA-1788 which has been in the work for some time now addresses this, right? 

 Add a request timeout to NetworkClient
 --

 Key: KAFKA-2120
 URL: https://issues.apache.org/jira/browse/KAFKA-2120
 Project: Kafka
  Issue Type: New Feature
Reporter: Jiangjie Qin
Assignee: Mayuresh Gharat

 Currently NetworkClient does not have a timeout setting for requests. So if 
 no response is received for a request due to reasons such as broker is down, 
 the request will never be completed.
 Request timeout will also be used as implicit timeout for some methods such 
 as KafkaProducer.flush() and kafkaProducer.close().
 KIP-19 is created for this public interface change.
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


Looks good, left a few comments.

KafkaConsumer suffers from this same problem. Patching that should be pretty 
much identical -- any chance you could extend this to cover that as well?


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/33242/#comment130187

This code is all single threaded, is the AtomicReference really necessary 
here?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/33242/#comment130194

Minor, but these messages should be cleaned up -- just needs some 
capitalization.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/33242/#comment130195

One idea for making this less verbose and redundant: make all of these 
classes implement Closeable so we can just write one utility method for trying 
to close something and catching the exception.


- Ewen Cheslack-Postava


On April 16, 2015, 5:03 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 16, 2015, 5:03 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 fix potential resource leak when KafkaProducer throws exception in the middle
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-16 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: KAFKA-2121_2015-04-16_10:43:55.patch

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 
 --
 -- Guozhang



--

Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 531
  https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line531
 
  This code is all single threaded, is the AtomicReference really 
  necessary here?

not really necessary. just trying to use the compareAndSet. otherwise, I need 
to do if(firstException == null) firstException = t. I can certainly change 
it. let me know.


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 548
  https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548
 
  One idea for making this less verbose and redundant: make all of these 
  classes implement Closeable so we can just write one utility method for 
  trying to close something and catching the exception.

yes. I thought about it. it may break binary compatibility, e.g. Serializer. 
Sender and Metrics classes are probably only used internally. let me know your 
thoughts.


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


On April 16, 2015, 5:44 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 16, 2015, 5:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 add a unit test file
 
 
 changes based on Ewen's review feedbacks
 
 
 fix capitalization in error log
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




[jira] [Commented] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-16 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498371#comment-14498371
 ] 

Steven Zhen Wu commented on KAFKA-2121:
---

Updated reviewboard https://reviews.apache.org/r/33242/diff/
 against branch apache/trunk

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121.patch.2, 
 KAFKA-2121_2015-04-16_09:55:14.patch, KAFKA-2121_2015-04-16_10:43:55.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix 

Re: [DISCUSS] New consumer offset commit API

2015-04-16 Thread Jiangjie Qin
Hey Ewen, 

This makes sense. People usually do not want to stop consuming when
committing offsets.

One corner case about async commit with retries I am thinking is that it
is possible that two offset commits interleave with each other and that
might create problem. Like you said maybe we can cancel the previous one.

Another thing is that whether the future mechanism will only be applied to
auto commit or it will also be used in manual commit? Because in new
consumer we allow user to provide an offset map for offset commit. Simply
canceling a previous pending offset commit does not seem to be ideal in
this case because the two commits could be for different partitions.

Thanks.

Jiangjie (Becket) Qin

On 4/14/15, 4:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

I'd like to get some feedback on changing the offset commit API in the new
consumer. Since this is user-facing API I wanted to make sure this gets
better visibility than the JIRA (
https://issues.apache.org/jira/browse/KAFKA-2123) might.

The motivation is to make it possible to do async commits but be able to
tell when the commit completes/fails. I'm suggesting changing the API from

void commit(Map offsets, CommitType)

to

FutureVoid commit(MapTopicPartition, Long offsets,
ConsumerCommitCallback callback);

which matches the approach used for the producer. The
ConsumerCommitCallback only has one method:

public void onCompletion(Exception exception);

This enables a few different use cases:

* Blocking commit via Future.get(), and blocking with timeouts via
Future.get(long, TimeUnit)
* See exceptions via the future (see discussion of retries below)
* Callback-based notification so you can keep processing messages and only
take action if something goes wrong, takes too long, etc. This is the use
case that motivated
* Fire and forget commits via a shorthand commit() API and ignoring the
resulting future.

One big difference between this and the producer API is that there isn't
any result (except maybe an exception) from commitOffsets. This leads to
the somewhat awkward FutureVoid signature. I personally prefer that to
the sync/async flag, especially since it also provides a non-blocking
interface for checking whether the commit is complete.

I posted a WIP patch to the JIRA. In the progress of making it I found a
few issues that might be worth discussing:

1. Retries. In the old approach, this was trivial since it only applied to
synchronous calls, so we could just loop until the request was successful.
Do we want to start introducing a retries mechanism here, and should it
apply to all types of requests or are we going to end up with a couple of
different retry settings for specific cases, like offset commit. The WIP
patch allows errors to bubble up through the Future on the first failure,
which right now can cause some tests to fail transiently (e.g. consumer
bounce test).

I think some sort of retry mechanism, even if it's an internal constant
rather than configurable, is probably the right solution, but I want to
figure out how broadly they should apply. I think adding them only for
offset commits isn't hard.

2. The Future implementation is a bit weird because the consumer doesn't
have a dedicated IO thread. My only concern is that this could lead to
some
unintuitive results based on the current implementation because the way
this works is to just run poll() in the thread calling Future.get(), but
it
mutes all non-coordinator nodes which means other processing is
effectively
paused. If you're processing offset commits in a separate thread from your
main consumer thread that's calling poll(), you might just end up bocking
the main thread while waiting on the Future. Then again, I'm not sure the
other nodes really even need to be muted -- maybe Jay or Guozhang have
ideas on this?

3. Should the future be cancellable? This probably isn't hard to
implement,
but I'm not sure we should even bother. On the one hand it could be nice,
especially if you have an old commit request that you want to superseded
by
a new one with updated offsets. On the other hand, if the request has
already been sent out, cancelling it won't accomplish anything. I think
the
only case this is useful is when there are retries.

Thoughts?

-- 
Thanks,
Ewen



[jira] [Updated] (KAFKA-2121) error handling in KafkaProducer constructor

2015-04-16 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated KAFKA-2121:
--
Attachment: (was: KAFKA-2121.patch.2)

 error handling in KafkaProducer constructor
 ---

 Key: KAFKA-2121
 URL: https://issues.apache.org/jira/browse/KAFKA-2121
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
Reporter: Steven Zhen Wu
Assignee: Jun Rao
 Attachments: KAFKA-2121.patch, KAFKA-2121_2015-04-16_09:55:14.patch, 
 KAFKA-2121_2015-04-16_10:43:55.patch


 On Mon, Apr 13, 2015 at 7:17 PM, Guozhang Wang wangg...@gmail.com wrote:
 It is a valid problem and we should correct it as soon as possible, I'm
 with Ewen regarding the solution.
 On Mon, Apr 13, 2015 at 5:05 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
  Steven,
 
  Looks like there is even more that could potentially be leaked -- since key
  and value serializers are created and configured at the end, even the IO
  thread allocated by the producer could leak. Given that, I think 1 isn't a
  great option since, as you said, it doesn't really address the underlying
  issue.
 
  3 strikes me as bad from a user experience perspective. It's true we might
  want to introduce additional constructors to make testing easier, but the
  more components I need to allocate myself and inject into the producer's
  constructor, the worse the default experience is. And since you would have
  to inject the dependencies to get correct, non-leaking behavior, it will
  always be more code than previously (and a backwards incompatible change).
  Additionally, the code creating a the producer would have be more
  complicated since it would have to deal with the cleanup carefully whereas
  it previously just had to deal with the exception. Besides, for testing
  specifically, you can avoid exposing more constructors just for testing by
  using something like PowerMock that let you mock private methods. That
  requires a bit of code reorganization, but doesn't affect the public
  interface at all.
 
  So my take is that a variant of 2 is probably best. I'd probably do two
  things. First, make close() safe to call even if some fields haven't been
  initialized, which presumably just means checking for null fields. (You
  might also want to figure out if all the methods close() calls are
  idempotent and decide whether some fields should be marked non-final and
  cleared to null when close() is called). Second, add the try/catch as you
  suggested, but just use close().
 
  -Ewen
 
 
  On Mon, Apr 13, 2015 at 3:53 PM, Steven Wu stevenz...@gmail.com wrote:
 
   Here is the resource leak problem that we have encountered when 0.8.2
  java
   KafkaProducer failed in constructor. here is the code snippet of
   KafkaProducer to illustrate the problem.
  
   ---
   public KafkaProducer(ProducerConfig config, SerializerK keySerializer,
   SerializerV valueSerializer) {
  
   // create metrcis reporter via reflection
   ListMetricsReporter reporters =
  
  
  config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
   MetricsReporter.class);
  
   // validate bootstrap servers
   ListInetSocketAddress addresses =
  
  
  ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  
   }
   ---
  
   let's say MyMetricsReporter creates a thread in constructor. if hostname
   validation threw an exception, constructor won't call the close method of
   MyMetricsReporter to clean up the resource. as a result, we created
  thread
   leak issue. this becomes worse when we try to auto recovery (i.e. keep
   creating KafkaProducer again - failing again - more thread leaks).
  
   there are multiple options of fixing this.
  
   1) just move the hostname validation to the beginning. but this is only
  fix
   one symtom. it didn't fix the fundamental problem. what if some other
  lines
   throw an exception.
  
   2) use try-catch. in the catch section, try to call close methods for any
   non-null objects constructed so far.
  
   3) explicitly declare the dependency in the constructor. this way, when
   KafkaProducer threw an exception, I can call close method of metrics
   reporters for releasing resources.
   KafkaProducer(..., ListMetricsReporter reporters)
   we don't have to dependency injection framework. but generally hiding
   dependency is a bad coding practice. it is also hard to plug in mocks for
   dependencies. this is probably the most intrusive change.
  
   I am willing to submit a patch. but like to hear your opinions on how we
   should fix the issue.
  
   Thanks,
   Steven
  
 
 
 
  --
  Thanks,
  Ewen
 
 --
 -- Guozhang



--
This message was sent by 

[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-16 Thread alexcb (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498750#comment-14498750
 ] 

alexcb commented on KAFKA-2098:
---

Perhaps I was unclear in what I was suggesting. Since [~gwenshap] mentioned 
that we don't want jars in the release, which I completely agree with, then we 
should also remove the auto generated gradlew and gradlew.bat scripts.

When gradlew is run, it hardcodes the CLASSPATH to 
./gradle/wrapper/gradle-wrapper.jar; and I don't see any mechanism in the 
script to include a user defined CLASSPATH. I fail to see how it's even 
possible to run gradlew in it's current state without running gradle first.

Can you elaborate on how this is possible to use the gradlew script without 
running gradle first? i.e. how does it run if 
./gradle/wrapper/gradle-wrapper.jar does not exist.

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498774#comment-14498774
 ] 

Ewen Cheslack-Postava commented on KAFKA-2098:
--

Sorry @alexcb, I think we were just talking past each other. Yes, I agree that 
having the wrapper scripts is pointless without the jars since you need gradle 
anyway. I was just pointing out that even if you require another gradle 
installation to bootstrap, we might still want to rely on the wrapper. For 
example, I'm not sure we want to change the README to change all the gradlew 
commands to use gradle directly like you demonstrated above. Using an existing 
gradle installation only to bootstrap the wrapper and then using the wrapper 
for everything can remove a bunch of compatibility issues.

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-16 Thread Aditya Auradkar
Hey Guozhang,

I don't think we should return an error if the request is satisfied after Y 
(throttling timeout) because it may cause the producer to think that the 
request was not ack'ed at all. 

Aditya


From: Guozhang Wang [wangg...@gmail.com]
Sent: Thursday, April 16, 2015 9:06 AM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

Hi Adi,

2. I assume you were saying than strictly needed for replications here?

Also the concern I have is around error code: today if the replication is
not finished within in the replication timeout then the error code will be
set accordingly when it returns. Let's say if the produce request is not
satisfied after X (replication timeout) ms, but is satisfied after Y
(throttling timeout), should we still set the error code or not? I think it
is OK to just set NO_ERROR but we need to document such cases clearly for
quote actions mixed with ack = -1.

Guozhang

On Wed, Apr 15, 2015 at 4:23 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Thanks for the review Guozhang.

 1. Agreed.

 2. This proposal actually waits for the maximum of the 2 timeouts. This
 reduces implementation complexity at the cost of waiting longer than
 strictly needed for quotas. Note that this is only for the case where
 acks=-1.

 However we can solve this if it is a significant concern by adding watcher
 keys for all partitions (only if acks=-1). These are the keys we would
 normally add while waiting for acknowledgements. We can change the
 tryComplete() function to return false until 'quota_timeout' time has
 elapsed AND all the acknowledgements have been received.

 Thanks,
 Aditya
 
 From: Guozhang Wang [wangg...@gmail.com]
 Sent: Wednesday, April 15, 2015 3:42 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 Thanks for the summary. A few comments below:

 1. Say a produce request has replication timeout X, and upon finishing the
 local append it is determined to be throttled Y ms where Y  X, then after
 it has timed out in the purgatory after Y ms we should still check if the
 #.acks has fulfilled in order to set the correct error codes in the
 response.

 2. I think it is actually common that the calculated throttle time Y is
 less than the replication timeout X, which will be a tricky case since we
 need to make sure 1) at least the request it held in the purgatory for Y
 ms, 2) after Y ms elapsed, if the #.acks has fulfilled within X ms then set
 no-error-code and return immediately, 3) after X ms elapsed, set
 timeout-error-code and return.

 Guozhang

 On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  This is an implementation proposal for delaying requests in quotas using
  the current purgatory. I'll discuss the usage for produce and fetch
  requests separately.
 
  1. Delayed Produce Requests - Here, the proposal is basically to reuse
  DelayedProduce objects and insert them into the purgatory with no watcher
  keys if the request is being throttled. The timeout used in the request
  should be the Max(quota_delay_time, replication_timeout).
  In most cases, the quota timeout should be greater than the existing
  timeout but in order to be safe, we can use the maximum of these values.
  Having no watch keys will allow the operation to be enqueued directly
 into
  the timer and will not add any overhead in terms of watching keys (which
  was a concern). In this case, having watch keys is not beneficial since
 the
  operation must be delayed for a fixed amount of time and there is no
  possibility for the operation to complete before the timeout i.e.
  tryComplete() can never return true before the timeout. On timeout, since
  the operation is a TimerTask, the timer will call run() which calls
  onComplete().
  In onComplete, the DelayedProduce can repeat the check in tryComplete()
  (only if acks=-1 whether all replicas fetched upto a certain offset) and
  return the response immediately.
 
  Code will be structured as follows in ReplicaManager:appendMessages()
 
  if(isThrottled) {
fetch = new DelayedProduce(timeout)
purgatory.tryCompleteElseWatch(fetch, Seq())
  }
  else if(delayedRequestRequired()) {
   // Insert into purgatory with watched keys for unthrottled requests
  }
 
  In this proposal, we avoid adding unnecessary watches because there is no
  possibility of early completion and this avoids any potential performance
  penalties we were concerned about earlier.
 
  2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the
  DelayedFetch objects and insert them into the purgatory with no watcher
  keys if the request is throttled. Timeout used is the
 Max(quota_delay_time,
  max_wait_timeout). Having no watch keys provides the same benefits as
  described above. Upon timeout, the onComplete() is called and the
 operation
  proceeds normally i.e. perform a readFromLocalLog and 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-16 Thread Jun Rao
The quota check for the fetch request is a bit different from the produce
request. I assume that for the fetch request, we will first get an
estimated fetch response size to do the quota check. There are two things
to think about. First, when we actually send the response, we probably
don't want to record the metric again since it will double count. Second,
the bytes that the fetch response actually sends could be more than the
estimate. This means that the metric may not be 100% accurate. We may be
able to limit the fetch size of each partition to what's in the original
estimate.

For the produce request, I was thinking that another way to do this is to
first figure out the quota_timeout. Then wait in Purgatory for
quota_timeout with no key. If the request is not satisfied in quota_timeout
and (request_timeout  quota_timeout), wait in Purgatory for
(request_timeout - quota_timeout) with the original keys.

Thanks,

Jun

On Tue, Apr 14, 2015 at 5:01 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 This is an implementation proposal for delaying requests in quotas using
 the current purgatory. I'll discuss the usage for produce and fetch
 requests separately.

 1. Delayed Produce Requests - Here, the proposal is basically to reuse
 DelayedProduce objects and insert them into the purgatory with no watcher
 keys if the request is being throttled. The timeout used in the request
 should be the Max(quota_delay_time, replication_timeout).
 In most cases, the quota timeout should be greater than the existing
 timeout but in order to be safe, we can use the maximum of these values.
 Having no watch keys will allow the operation to be enqueued directly into
 the timer and will not add any overhead in terms of watching keys (which
 was a concern). In this case, having watch keys is not beneficial since the
 operation must be delayed for a fixed amount of time and there is no
 possibility for the operation to complete before the timeout i.e.
 tryComplete() can never return true before the timeout. On timeout, since
 the operation is a TimerTask, the timer will call run() which calls
 onComplete().
 In onComplete, the DelayedProduce can repeat the check in tryComplete()
 (only if acks=-1 whether all replicas fetched upto a certain offset) and
 return the response immediately.

 Code will be structured as follows in ReplicaManager:appendMessages()

 if(isThrottled) {
   fetch = new DelayedProduce(timeout)
   purgatory.tryCompleteElseWatch(fetch, Seq())
 }
 else if(delayedRequestRequired()) {
  // Insert into purgatory with watched keys for unthrottled requests
 }

 In this proposal, we avoid adding unnecessary watches because there is no
 possibility of early completion and this avoids any potential performance
 penalties we were concerned about earlier.

 2. Delayed Fetch Requests - Similarly, the proposal here is to reuse the
 DelayedFetch objects and insert them into the purgatory with no watcher
 keys if the request is throttled. Timeout used is the Max(quota_delay_time,
 max_wait_timeout). Having no watch keys provides the same benefits as
 described above. Upon timeout, the onComplete() is called and the operation
 proceeds normally i.e. perform a readFromLocalLog and return a response.
 The caveat here is that if the request is throttled but the throttle time
 is less than the max_wait timeout on the fetch request, the request will be
 delayed to a Max(quota_delay_time, max_wait_timeout). This may be more than
 strictly necessary (since we are not watching for
 satisfaction on any keys).

 I added some testcases to DelayedOperationTest to verify that it is
 possible to schedule operations with no watcher keys. By inserting elements
 with no watch keys, the purgatory simply becomes a delay queue. It may also
 make sense to add a new API to the purgatory called
 delayFor() that basically accepts an operation without any watch keys
 (Thanks for the suggestion Joel).

 Thoughts?

 Thanks,
 Aditya

 
 From: Guozhang Wang [wangg...@gmail.com]
 Sent: Monday, April 13, 2015 7:27 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 I think KAFKA-2063 (bounding fetch response) is still under discussion, and
 may not be got it in time with KAFKA-1927.

 On Thu, Apr 9, 2015 at 4:49 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  I think it's reasonable to batch the protocol changes together. In
  addition to the protocol changes, is someone actively driving the server
  side changes/KIP process for KAFKA-2063?
 
  Thanks,
  Aditya
 
  
  From: Jun Rao [j...@confluent.io]
  Sent: Thursday, April 09, 2015 8:59 AM
  To: dev@kafka.apache.org
  Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas
 
  Since we are also thinking about evolving the fetch request protocol in
  KAFKA-2063 (bound fetch response size), perhaps it's worth thinking
 through
  if we can just evolve the protocol once.
 
  Thanks,
 
  

Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Steven Wu


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  Looks good, left a few comments.
  
  KafkaConsumer suffers from this same problem. Patching that should be 
  pretty much identical -- any chance you could extend this to cover that as 
  well?

sure. I can extend this to KafkaConsumer later.


- Steven


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


On April 16, 2015, 5:44 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 16, 2015, 5:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 add a unit test file
 
 
 changes based on Ewen's review feedbacks
 
 
 fix capitalization in error log
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-16 Thread Jiangjie Qin
Hi Guozhang,

By implicit timeout for close() and flush(), I meant that currently we
don’t have a explicit timeout for close() or flush() when a broker is
down, so they can take pretty long up to TCP timeout which is hours as you
mentioned. With the client side request timeout, the waiting time would be
sort of bounded by request timeout.

And I agree we’d better change the TIMEOUT_CONFIG to
REPLICATION_TIMEOUT_CONFIG to avoid confusion.

Thanks.

Jiangjie (Becket) Qin

On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote:

Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the
request
timeout as implict timeout. I am not very clear what does this mean?

2. Currently the producer already has a TIMEOUT_CONFIG which should
really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
it will change the config names but will reduce confusions moving forward.


Guozhang


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Checked the code again. It seems that the disconnected channel is not
 detected by selector as expected.

 Currently we are depending on the
 o.a.k.common.network.Selector.disconnected set to see if we need to do
 something for a disconnected channel.
 However Selector.disconnected set is only updated when:
 1. A write/read/connect to channel failed.
 2. A Key is canceled
 However when a broker is down before it sends back the response, the
 client seems not be able to detect this failure.

 I did a simple test below:
 1. Run a selector on one machine and an echo server on another machine.
 Connect a selector to an echo server
 2. Send a message to echo server using selector, then let the selector
 poll() every 10 seconds.
 3. After the sever received the message, unplug cable on the echo
server.
 4. After waiting for 45 min. The selector still did not detected the
 network failure.
 Lsof on selector machine shows that the TCP connection is still
considered
 ESTABLISHED.

 I’m not sure in this case what should we expect from the
 java.nio.channels.Selector. According to the document, the selector does
 not verify the status of the associated channel. In my test case it
looks
 even worse that OS did not think of the socket has been disconnected.

 Anyway. It seems adding the client side request timeout is necessary.
I’ve
 updated the KIP page to clarify the problem we want to solve according
to
 Ewen’s comments.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
  Hi Ewen, thanks for the comments. Very good points! Please see
replies
  inline.
 
 
  On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Jiangjie,
  
  Great start. I have a couple of comments.
  
  Under the motivation section, is it really true that the request
will
  never
  be completed? Presumably if the broker goes down the connection
will be
  severed, at worst by a TCP timeout, which should clean up the
 connection
  and any outstanding requests, right? I think the real reason we
need a
  different timeout is that the default TCP timeouts are ridiculously
 long
  in
  this context.
  Yes, when broker is completely down the request should be cleared as
you
  said. The case we encountered looks like the broker was just not
  responding but TCP connection was still alive though.
 
 
 Ok, that makes sense.
 
 
 
  
  My second question is about whether this is the right level to
tackle
 the
  issue/what user-facing changes need to be made. A related problem
came
 up
  in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
  records
  get stuck indefinitely because there's no client-side timeout. This
KIP
  wouldn't fix that problem or any problems caused by lack of
 connectivity
  since this would only apply to in flight requests, which by
definition
  must
  have been sent on an active connection.
  
  I suspect both types of problems probably need to be addressed
 separately
  by introducing explicit timeouts. However, because the settings
 introduced
  here are very much about the internal implementations of the
clients,
 I'm
  wondering if this even needs to be a user-facing setting, especially
 if we
  have to add other timeouts anyway. For example, would a fixed,
generous
  value that's still much shorter than a TCP timeout, say 15s, be good
  enough? If other timeouts would allow, for example, the clients to
  properly
  exit even if requests have not hit their timeout, then what's the
 benefit
  of being able to configure the request-level timeout?
  That is a very good point. We have three 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)

2015-04-16 Thread Jun Rao
1. For the lags, we can add a new field lags per partition. It will
return for each replica that's not in isr, the replica id and the lag in
messages. Also, if TMR is sent to a non-leader, the response can just
include an empty array for isr and lags.

2. So, we will just return a topic level error for the duplicated topics,
right?

3. Yes, it's true that today, one can specify both partitions and
replicaAssignment in the TopicCommand. However, partitions is actually
ignored. So, it will be clearer if we don't allow users to do this.

4. How many specific error codes like InvalidPartitions and InvalidReplicas
are needed? If it's not that many, giving out more specific error will be
useful for non-java clients.

Thanks,

Jun


On Wed, Apr 15, 2015 at 10:23 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Guys,

 Thanks for the discussion!

 Summary:

 1. Q: How KAFKA-1367 (isr is inconsistent in brokers' metadata cache) can
 affect implementation?
 A: We can fix this issue for the leading broker - ReplicaManager (or
 Partition)
 component should have accurate isr list, then with leading broker
 having correct
 info, to do a describe-topic we will need to define leading brokers
 for partitions
 and ask those for a correct isr list.
 Also, we should consider adding lag information to TMR for each
 follower for
 partition reassignment, as Jun suggested above.

 2. Q: What if user adds different alter commands for the same topic in
 scope
  of one batch request?
 A: Because of the async nature of AlterTopicRequest it will be very
 hard then
 to assemble the expected (in terms of checking whether request is
 complete)
 result if we let users do this. Also it will be very confusing. It
 was proposed not to
 let users do this (probably add new Error for such cases).

 3. Q: AlterTopicRequest semantics: now when we merged AlterTopic and
 ReassingPartitons in which order AlterTopic fields should be
 resolved?
 A: This item is not clear. There was a proposal to let user change only
 one thing at a time, e.g. specify either new Replicas, or
 ReplicaAssignment.
 This can be a simple solution, but it's a very strict rule. E.g.
 currently with
 TopicCommand user can increase nr of partitions and define replica
 assignment
 for newly added partitions. Taking into account item 2. this will
 be even harder
 for user to achieve this.

 4. Q: Do we need such accurate errors returned from the server:
 InvalidArgumentPartitions,
  InvalidArgumentReplicas etc.
 A: I started implementation to add proposed error codes and now I think
 probably
 InvalidArgumentError should be sufficient. We can do simple
 validations on
 the client side (e.g. AdminClient can ensure nr of partitions
 argument is positive),
 others - which can be covered only on server (probably invalid
 topic config,
 replica assignment includes dead broker etc) - will be done on
 server, and in case
 of invalid argument we will return InvalidArgumentError without
 specifying the
 concrete field.

 It'd be great if we could cover these remaining issues, looks like they are
 minor,
 at least related to specific messages, not the overall protocol. - I think
 with that I can
 update confluence page and update patch to reflect all discussed items.
 This patch
 will probably include Wire protocol messages and server-side code to handle
 new
 requests. AdminClient and cli-tool implementation can be the next step.

 Thanks,
 Andrii Biletskyi

 On Wed, Apr 15, 2015 at 7:26 PM, Jun Rao j...@confluent.io wrote:

  Andrii,
 
  500. I think what you suggested also sounds reasonable. Since ISR is only
  maintained accurately at the leader, TMR can return ISR if the broker is
  the leader of a partition. Otherwise, we can return an empty ISR. For
  partition reassignment, it would be useful to know the lag of each
  follower. Again, the leader knows this info. We can probably include that
  info in TMR as well.
 
  300. I think it's probably reasonable to restrict AlterTopicRequest to
  change only one thing at a time, i.e., either partitions, replicas,
 replica
  assignment or config.
 
  Thanks,
 
  Jun
 
  On Mon, Apr 13, 2015 at 10:56 AM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Jun,
  
   404. Great, thanks!
  
   500. If I understand correctly KAFKA-1367 says ISR part of TMR may
   be inconsistent. If so, then I believe all admin commands but
  describeTopic
   are not affected. Let me emphasize that it's about AdminClient
  operations,
   not about Wire Protocol requests. What I mean:
   To verify AdminClient.createTopic we will need (consistent) 'topics'
 set
   from TMR (we don't need isr)
   To verify alterTopic - again, probably 'topics' and 'assigned
 replicas' +
   configs
   To verify deleteTopic - only 'topics'
   To verify 

Re: Review Request 33239: Patch for KAFKA-2126

2015-04-16 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33239/#review80394
---

Ship it!


Ship It!

- Jiangjie Qin


On April 15, 2015, 8:19 p.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33239/
 ---
 
 (Updated April 15, 2015, 8:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2126
 https://issues.apache.org/jira/browse/KAFKA-2126
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2126: Configure automatically instantiated deserializers in new 
 consumer.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 21243345311a106f0802ce96c026ba6e815ccf99 
 
 Diff: https://reviews.apache.org/r/33239/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: [DISCUSSION] KIP-11: ACL Management

2015-04-16 Thread Jun Rao
Hi, Gwen,

What you suggested seems reasonable. I guess we will need the Principal,
Privilege pair and the Resource in grant() and revoke()?

Is the Hive authorization api the following? It's weird that it takes user
in checkPermissions(), but not in authorize().

http://hive.apache.org/javadocs/r0.11.0/api/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.html

I was imagining that a default implementation could be similar to how we
store offsets in Kafka. Basically, store all acls in a special topic with
compact retention. Then, every broker will build an in-memory cache off
that topic.

Another thing that we haven't discussed so far is how to manage ACLs across
different mirrored Kafka clusters. Let's say you use mirror maker to mirror
all topics from cluster A to cluster B. You probably want to have exactly
the same ACL on both A and B. It would be good if the ACL can be set up
just once. If we use the above default implementation, since the ACL topic
is mirrored too, the ACL will be propagated automatically.

Thanks,

Jun


On Thu, Apr 16, 2015 at 9:44 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi Kafka Authorization Fans,

 I'm starting a new thread on a specific sub-topic of KIP-11, since
 this is a bit long :)

 Currently KIP-11, as I understand it, proposes:
 * Authorizers are pluggable, with Kafka providing DefaultAuthorizer.
 * Kafka tools allow adding / managing ACLs.
 * Those ACLs are stored in ZK and cached in a new TopicCache
 * Authorizers can either use the ACLs defined and stored in Kafka, or
 define and use their own.

 I am concerned of two possible issues with this design:
 1. Separation of concerns - only authorizers should worry about ACLs,
 and therefore the less code for ACLs that exist in Kafka core, the
 better.
 2. User confusion - It sounded like we can define ACLs in Kafka itself
 but authorizers can also define their own, so kafka-topics
 --describe may show an ACL different than the one in use. This can be
 super confusing for admins.

 My alternative suggestion:
 * Authorizer API will include:
  grantPrivilege(ListPrincipals, ListPrivilege)
  revokePrivilege(ListPrincipals, ListPrivilege),
  getPrivilegesByPrincipal(Principal, Resource)
  
  (The exact API can be discussed in detail, but you get the idea)
 * Kafka tools will simply invoke these APIs when topics are added /
 modified / described.
 * Each authorizer (including the default one) will be responsible for
 storing, caching and using those ACLs.

 This way, we keep almost all ACL code with the Authorizer, where it
 belongs and users get a nice unified interface that reflects what is
 actually getting used in the system.
 This is pretty much how Sqoop and Hive implement their authorization APIs.

 What do you think?

 Gwen



[jira] [Created] (KAFKA-2129) Consumer could make multiple concurrent metadata requests

2015-04-16 Thread Tim Brooks (JIRA)
Tim Brooks created KAFKA-2129:
-

 Summary: Consumer could make multiple concurrent metadata requests
 Key: KAFKA-2129
 URL: https://issues.apache.org/jira/browse/KAFKA-2129
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Tim Brooks
Priority: Minor


The NetworkClient's metadataFetchInProgress is neither volatile nor atomic. 
This protects against multiple metadata requests being made and is read on 
poll() on the NetworkClient. It is written to when a request is initiated.

This is fine for the producer. Which seems to have one thread writing. The 
KafkaConsumer's poll()  method is synchronized, so there will not be more than 
one writer entering from there. However, the NetworkClient's poll() method is 
also accessed on the Consumer's partitionsFor() method. Which could be access 
by a separate thread.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33242: Patch for KAFKA-2121

2015-04-16 Thread Ewen Cheslack-Postava


 On April 16, 2015, 5:29 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 548
  https://reviews.apache.org/r/33242/diff/2/?file=931792#file931792line548
 
  One idea for making this less verbose and redundant: make all of these 
  classes implement Closeable so we can just write one utility method for 
  trying to close something and catching the exception.
 
 Steven Wu wrote:
 yes. I thought about it. it may break binary compatibility, e.g. 
 Serializer. Sender and Metrics classes are probably only used internally. let 
 me know your thoughts.

I'm pretty sure it's fine, based on this

Changing the direct superclass or the set of direct superinterfaces of a class 
type will not break compatibility with pre-existing binaries, provided that the 
total set of superclasses or superinterfaces, respectively, of the class type 
loses no members.

from https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33242/#review80346
---


On April 16, 2015, 5:44 p.m., Steven Wu wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33242/
 ---
 
 (Updated April 16, 2015, 5:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2121
 https://issues.apache.org/jira/browse/KAFKA-2121
 
 
 Repository: kafka
 
 
 Description
 ---
 
 add a unit test file
 
 
 changes based on Ewen's review feedbacks
 
 
 fix capitalization in error log
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a 
   
 clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
  PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/33242/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Steven Wu
 




[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-16 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498431#comment-14498431
 ] 

Ewen Cheslack-Postava commented on KAFKA-2098:
--

[~alexcb] That works as long as you have a version of gradle that works with 
the project. The other thing the wrapper does is let the project specify the 
version of gradle to build with, which makes compatibility simpler -- if you 
have an older system-installed gradle, the wrapper lets you use a newer version 
just for that specific project. This makes builds more robust since they don't 
have to support every version of gradle that people might have installed.

Unfortunately the gradlew bootstrapping doesn't work with all versions, which 
is why Gradle recommends checking in the scripts and jars. For example, setting 
up Kafka for development on Ubuntu Trusty is kind of a pain right now because 
the version of Gradle (1.4) is too old to get past the bootstrapping phase, so 
you still need to download Gradle separately just to bootstrap the wrapper 
scripts.

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi
Assignee: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: Kafka-trunk #464

2015-04-16 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/464/changes

Changes:

[junrao] kafka-2113; TestPurgatoryPerformance does not compile using IBM JDK; 
patched by Rajini Sivaram; reviewed by Yasuhiro Matsuda and Jun Rao

--
[...truncated 1245 lines...]
kafka.log.LogTest  testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest  testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.LogSegmentTest  testReadOnEmptySegment PASSED

kafka.log.LogSegmentTest  testReadBeforeFirstOffset PASSED

kafka.log.LogSegmentTest  testMaxOffset PASSED

kafka.log.LogSegmentTest  testReadAfterLast PASSED

kafka.log.LogSegmentTest  testReadFromGap PASSED

kafka.log.LogSegmentTest  testTruncate PASSED

kafka.log.LogSegmentTest  testTruncateFull PASSED

kafka.log.LogSegmentTest  testNextOffsetCalculation PASSED

kafka.log.LogSegmentTest  testChangeFileSuffixes PASSED

kafka.log.LogSegmentTest  testRecoveryFixesCorruptIndex PASSED

kafka.log.LogSegmentTest  testRecoveryWithCorruptMessage PASSED

kafka.log.LogConfigTest  testFromPropsDefaults PASSED

kafka.log.LogConfigTest  testFromPropsEmpty PASSED

kafka.log.LogConfigTest  testFromPropsToProps PASSED

kafka.log.LogConfigTest  testFromPropsInvalid PASSED

kafka.log.CleanerTest  testCleanSegments PASSED

kafka.log.CleanerTest  testCleaningWithDeletes PASSED

kafka.log.CleanerTest  testCleaningWithUnkeyedMessages PASSED

kafka.log.CleanerTest  testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest  testSegmentGrouping PASSED

kafka.log.CleanerTest  testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest  testBuildOffsetMap PASSED

kafka.log.FileMessageSetTest  testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest  testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest  testSizeInBytes PASSED

kafka.log.FileMessageSetTest  testWriteTo PASSED

kafka.log.FileMessageSetTest  testTruncate PASSED

kafka.log.FileMessageSetTest  testFileSize PASSED

kafka.log.FileMessageSetTest  testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest  testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest  testRead PASSED

kafka.log.FileMessageSetTest  testSearch PASSED

kafka.log.FileMessageSetTest  testIteratorWithLimits PASSED

kafka.log.OffsetMapTest  testBasicValidation PASSED

kafka.log.OffsetMapTest  testClear PASSED

kafka.log.OffsetIndexTest  truncate PASSED

kafka.log.OffsetIndexTest  randomLookupTest PASSED

kafka.log.OffsetIndexTest  lookupExtremeCases PASSED

kafka.log.OffsetIndexTest  appendTooMany PASSED

kafka.log.OffsetIndexTest  appendOutOfOrder PASSED

kafka.log.OffsetIndexTest  testReopen PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[19] PASSED

kafka.log.LogManagerTest  testCreateLog PASSED

kafka.log.LogManagerTest  testGetNonExistentLog PASSED

kafka.log.LogManagerTest  testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest  testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest  testTimeBasedFlush PASSED

kafka.log.LogManagerTest  testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest  testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest  testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest  testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest  testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogCleanerIntegrationTest  cleanerTest PASSED


Re: [DISCUSSION] KIP-11: ACL Management

2015-04-16 Thread Gari Singh
Hi Gwen -

I tend to agree with your proposal.  As you mention the exact details /
interfaces would need to be worked out, but this would be more in line with
how JAAS and JACC work in the Java / JEE worlds.

I do think that it might be nice to include / provide some helper APIs /
methods for caching (so that people don't accidentally cause OOM
situations) and it would be convenient to have some type of distribution /
synchronization mechanisms for local / file based ACL lists  (again perhaps
it would be possible to provide a standard helper method for using
Zookeeper?).

-- Gari


On Thu, Apr 16, 2015 at 11:44 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Hi Kafka Authorization Fans,

 I'm starting a new thread on a specific sub-topic of KIP-11, since
 this is a bit long :)

 Currently KIP-11, as I understand it, proposes:
 * Authorizers are pluggable, with Kafka providing DefaultAuthorizer.
 * Kafka tools allow adding / managing ACLs.
 * Those ACLs are stored in ZK and cached in a new TopicCache
 * Authorizers can either use the ACLs defined and stored in Kafka, or
 define and use their own.

 I am concerned of two possible issues with this design:
 1. Separation of concerns - only authorizers should worry about ACLs,
 and therefore the less code for ACLs that exist in Kafka core, the
 better.
 2. User confusion - It sounded like we can define ACLs in Kafka itself
 but authorizers can also define their own, so kafka-topics
 --describe may show an ACL different than the one in use. This can be
 super confusing for admins.

 My alternative suggestion:
 * Authorizer API will include:
  grantPrivilege(ListPrincipals, ListPrivilege)
  revokePrivilege(ListPrincipals, ListPrivilege),
  getPrivilegesByPrincipal(Principal, Resource)
  
  (The exact API can be discussed in detail, but you get the idea)
 * Kafka tools will simply invoke these APIs when topics are added /
 modified / described.
 * Each authorizer (including the default one) will be responsible for
 storing, caching and using those ACLs.

 This way, we keep almost all ACL code with the Authorizer, where it
 belongs and users get a nice unified interface that reflects what is
 actually getting used in the system.
 This is pretty much how Sqoop and Hive implement their authorization APIs.

 What do you think?

 Gwen



[jira] [Updated] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-04-16 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1660:

Attachment: KAFKA-1660_2015-04-16_11:35:37.patch

 Ability to call close() with a timeout on the Java Kafka Producer. 
 ---

 Key: KAFKA-1660
 URL: https://issues.apache.org/jira/browse/KAFKA-1660
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 0.8.2.0
Reporter: Andrew Stein
Assignee: Jiangjie Qin
 Fix For: 0.8.3

 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, 
 KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, 
 KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, 
 KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, 
 KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, 
 KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch


 I would like the ability to call {{close}} with a timeout on the Java 
 Client's KafkaProducer.
 h6. Workaround
 Currently, it is possible to ensure that {{close}} will return quickly by 
 first doing a {{future.get(timeout)}} on the last future produced on each 
 partition, but this means that the user has to define the partitions up front 
 at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1660) Ability to call close() with a timeout on the Java Kafka Producer.

2015-04-16 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498469#comment-14498469
 ] 

Jiangjie Qin commented on KAFKA-1660:
-

Updated reviewboard https://reviews.apache.org/r/31850/diff/
 against branch origin/trunk

 Ability to call close() with a timeout on the Java Kafka Producer. 
 ---

 Key: KAFKA-1660
 URL: https://issues.apache.org/jira/browse/KAFKA-1660
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 0.8.2.0
Reporter: Andrew Stein
Assignee: Jiangjie Qin
 Fix For: 0.8.3

 Attachments: KAFKA-1660.patch, KAFKA-1660.patch, 
 KAFKA-1660_2015-02-17_16:41:19.patch, KAFKA-1660_2015-03-02_10:41:49.patch, 
 KAFKA-1660_2015-03-08_21:14:50.patch, KAFKA-1660_2015-03-09_12:56:39.patch, 
 KAFKA-1660_2015-03-25_10:55:42.patch, KAFKA-1660_2015-03-27_16:35:42.patch, 
 KAFKA-1660_2015-04-07_18:18:40.patch, KAFKA-1660_2015-04-08_14:01:12.patch, 
 KAFKA-1660_2015-04-10_15:08:54.patch, KAFKA-1660_2015-04-16_11:35:37.patch


 I would like the ability to call {{close}} with a timeout on the Java 
 Client's KafkaProducer.
 h6. Workaround
 Currently, it is possible to ensure that {{close}} will return quickly by 
 first doing a {{future.get(timeout)}} on the last future produced on each 
 partition, but this means that the user has to define the partitions up front 
 at the time of {{send}} and track the returned {{future}}'s



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31850: Patch for KAFKA-1660

2015-04-16 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
---

(Updated April 16, 2015, 6:35 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
---

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Addressed Jay's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
c2fdc23239bd2196cd912c3d121b591f21393eab 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
---

Unit tests passed.


Thanks,

Jiangjie Qin



Re: Review Request 31850: Patch for KAFKA-1660

2015-04-16 Thread Jiangjie Qin


 On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
   line 155
  https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line155
 
  This scheme is clever but non-obvious, is there a simpler way?
 
 Jiangjie Qin wrote:
 I'm not sure if there is a simpler way. Maybe we can review the current 
 approach again and see if we can simplify them.
 
 The goals we want to achieve here are:
 1. When abortImcompleteBatch finishes, no more message should be 
 appended. 
 2. Make sure when hasUnsent() return false, it does not miss any batch.
 
 The current solutions for them both depending on setting close flag first.
 To achieve (1), the implementation now is setting a close flag first and 
 wait until all on going appends (if any) to finish.
 To achieve (2), the implementation synchoronizes on the deque. When an 
 append grabs deque lock, it first check if close flag is set or not. If it is 
 set, that means hasUnsent() might have already checked this deque, so it is 
 not safe to append a new batch anymore. Otherwise it is safe to append a new 
 batch.

Thought about this again. And I went back to use the ReaderWriterLock with a 
small modification using tryLock. Hope it makes things cleaner. The idea is 
exactly the same as current approach but the code is less confusing. I put some 
reasoning below, please let me know if you have any suggestions.

Essentially we want to make sure no messages or batches is left behind after 
1. calling abortIncompleteBatches() 
2. hasUnsent() returned false (hasUnsent() only cares about batch, not message)
This means we need to make sure append will not proceed after these two events.

We set close flag before these two events to reject append. And the key issue 
we want to solve here is how to deal with the appending thread. 

To detect if there are appending threads, we need something either a explicit 
flag or an exclusive lock (ReaderWriterLock as in previous patch)
If there are appending going on, we have two options here:
A. Fail the append
B. wait until appends are done.

It is a little bit difficult to fail the append because it is difficult to know 
which step appending is in. hasUnsent() uses dequeue lock to make sure no new 
batch can be added to a dequeue after it has been checked by hasUnsent(). 
That's why we need to check the close flag in append after grabbed dequeue lock 
again.

For abortIncompleteBatches, currently we use option B. One tricky thing here is 
that a thread might block on buffer full when abortIncompleteBatches is 
waiting. This would lead to deadlock if the abortIncompleteBatches is called 
from sender thread as no memory will be released by sender thread and sender 
thread is waiting for the append which is waiting for memeory. This means we 
need to keep sender thread running to release memory. To solve this issue, 
instead of blocking on acquiring writelock, a tryLock is used, and if it 
returns false, we keep fail batches to release memory until we grab the write 
lock successfully. Then we abort batches for the last time.

What do you think about this approach?


 On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, 
  line 539
  https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line539
 
  It wouldn't block forever, that isn't correct, it would just block for 
  the period of time they specified.
 
 Jiangjie Qin wrote:
 We are saying we will call close(0) instead of sender thread call 
 close(timeout). And we do this to *avoid* blocking forever.

I removed the comment to avoid confusion.


- Jiangjie


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79813
---


On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31850/
 ---
 
 (Updated April 16, 2015, 6:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1660
 https://issues.apache.org/jira/browse/KAFKA-1660
 
 
 Repository: kafka
 
 
 Description
 ---
 
 A minor fix.
 
 
 Incorporated Guozhang's comments.
 
 
 Modify according to the latest conclusion.
 
 
 Patch for the finally passed KIP-15git status
 
 
 Addressed Joel and Guozhang's comments.
 
 
 rebased on trunk
 
 
 Rebase on trunk
 
 
 Addressed Joel's comments.
 
 
 Addressed Joel's comments
 
 
 Addressed Jay's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 b91e2c52ed0acb1faa85915097d97bafa28c413a