[jira] [Commented] (KAFKA-1298) Controlled shutdown tool doesn't seem to work out of the box

2015-03-14 Thread Grzegorz Dubicki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362047#comment-14362047
 ] 

Grzegorz Dubicki commented on KAFKA-1298:
-

I think that the controlled shutdown tools docs here 
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown
 should also be updated. I would do that myself but I don't have proper 
permissions in that Confluence.

 Controlled shutdown tool doesn't seem to work out of the box
 

 Key: KAFKA-1298
 URL: https://issues.apache.org/jira/browse/KAFKA-1298
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability
 Fix For: 0.8.2.0

 Attachments: KAFKA-1298.patch, KAFKA-1298.patch, 
 KAFKA-1298_2014-06-09_17:20:44.patch


 Download Kafka and try to use our shutdown tool. Got this:
 bin/kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper localhost:2181 
 --broker 0
 [2014-03-06 16:58:23,636] ERROR Operation failed due to controller failure 
 (kafka.admin.ShutdownBroker$)
 java.io.IOException: Failed to retrieve RMIServer stub: 
 javax.naming.ServiceUnavailableException [Root exception is 
 java.rmi.ConnectException: Connection refused to host: 
 jkreps-mn.linkedin.biz; nested exception is: 
   java.net.ConnectException: Connection refused]
   at 
 javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:340)
   at 
 javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:249)
   at 
 kafka.admin.ShutdownBroker$.kafka$admin$ShutdownBroker$$invokeShutdown(ShutdownBroker.scala:56)
   at kafka.admin.ShutdownBroker$.main(ShutdownBroker.scala:109)
   at kafka.admin.ShutdownBroker.main(ShutdownBroker.scala)
 Caused by: javax.naming.ServiceUnavailableException [Root exception is 
 java.rmi.ConnectException: Connection refused to host: 
 jkreps-mn.linkedin.biz; nested exception is: 
   java.net.ConnectException: Connection refused]
   at 
 com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:101)
   at 
 com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:185)
   at javax.naming.InitialContext.lookup(InitialContext.java:392)
   at 
 javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1888)
   at 
 javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1858)
   at 
 javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:257)
   ... 4 more
 Caused by: java.rmi.ConnectException: Connection refused to host: 
 jkreps-mn.linkedin.biz; nested exception is: 
   java.net.ConnectException: Connection refused
   at sun.rmi.transport.tcp.TCPEndpoint.newSocket(TCPEndpoint.java:601)
   at 
 sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:198)
   at sun.rmi.transport.tcp.TCPChannel.newConnection(TCPChannel.java:184)
   at sun.rmi.server.UnicastRef.newCall(UnicastRef.java:322)
   at sun.rmi.registry.RegistryImpl_Stub.lookup(Unknown Source)
   at 
 com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:97)
   ... 9 more
 Caused by: java.net.ConnectException: Connection refused
   at java.net.PlainSocketImpl.socketConnect(Native Method)
   at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:382)
   at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:241)
   at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:228)
   at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431)
   at java.net.Socket.connect(Socket.java:527)
   at java.net.Socket.connect(Socket.java:476)
   at java.net.Socket.init(Socket.java:373)
   at java.net.Socket.init(Socket.java:187)
   at 
 sun.rmi.transport.proxy.RMIDirectSocketFactory.createSocket(RMIDirectSocketFactory.java:22)
   at 
 sun.rmi.transport.proxy.RMIMasterSocketFactory.createSocket(RMIMasterSocketFactory.java:128)
   at sun.rmi.transport.tcp.TCPEndpoint.newSocket(TCPEndpoint.java:595)
   ... 14 more
 Oh god, RMI?!!!???
 Presumably this is because we stopped setting the JMX port by default. This 
 is good because setting the JMX port breaks the quickstart which requires 
 running multiple nodes on a single machine. The root cause imo is just using 
 RMI here instead of our regular RPC.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-14 Thread Jiangjie Qin
Hi Jay,

I have modified the KIP as you suggested. I thinks as long as we have
consistent define for timeout across Kafka interface, there would be no
problem. And I also agree it is better if we can make producer block when
close() is called from sender thread so user will notice something went
wrong.

Thanks.

Jiangjie (Becket) Qin

On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote:

Hey Jiangjie,

I think this is going to be very confusing that
  close(0) waits indefinitely and
  close(-1) waits for 0.
I understand this appears in other apis, but it is a constant cause of
bugs. Let's not repeat that mistake.

Let's make close(0) wait for 0. We don't need a way to wait indefinitely
as
we already have close() so having a magical constant for that is
redundant.

Calling close() from the I/O thread was already possible and would block
indefinitely. I think trying to silently change the behavior is probably
not right. I.e. if the user calls close() in the callback there is
actually
some misunderstanding and they need to think more, silently making this
not
block will hide the problem from them which is the opposite of what we
want.

-Jay

On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Joe  Jay,

 Thanks for the comments on the voting thread. Since it seems we probably
 will have more discussion on this, I am just replying from the
discussion
 thread here.
 I’ve updated the KIP page to make it less like half-baked, apologize for
 the rush...

 The contract in current KIP is:
   1. close() - wait until all requests either are sent or reach request
 timeout.
   2. close(-1, TimeUnit.MILLISECONDS) - close immediately
   3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait
 until all requests are sent or reach request timeout
   4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending
in 5
 milliseconds, if something went wrong, just shutdown the producer
anyway,
 my callback will handle the failures.

 About how we define what timeout value stands for, I actually struggled
a
 little bit when wrote the patch. Intuitively, close(0) should mean
 immediately, however it seems that all the existing java class have this
 convention of timeout=0 means no timeout or never timeout
(Thread.join(0),
 Object.wait(0), etc.) So here the dilemma is either we follow the
 intuition or we follow the convention. What I chose is to follow the
 convention but document the interface to let user be aware of the usage.
 The reason is that I think producer.close() is a public interface so it
 might be better to follow java convention. Whereas selector is not a
 public interface that used by user, so as long as it makes sense to us,
it
 is less a problem to be different from java convention. That said since
 consumer.poll(timeout) is also a public interface, I think it also makes
 sense to make producer.close() to have the same definition of
 consumer.poll(timeout).

 The main argument for keeping a timeout in close would be separating the
 close timeout from request timeout, which probably makes sense. I would
 guess typically the request timeout would be long (e.g. 60 seconds)
 because we might want to consider retries with back off time. If we have
 multiple batches in accumulator, in worst case that could take up to
 several minutes to complete all the requests. But when we close a
 producer, we might not want to wait for that long as it might cause some
 other problem like deployment tool timeout.

 There is also a subtle difference between close(timeout) and
 flush(timeout). The only purpose for flush() is to write data to the
 broker, so it makes perfect sense to wait until request timeout. I think
 that is why flush(timeout) looks strange. On the other hand, the top
 priority for close() is to close the producer rather than flush() data,
so
 close(timeout) gives guarantee on bounded waiting for its main job.

 Sorry for the confusion about forceClose flag. It is not a public
 interface. I mentioned it in Proposed Changes section which I thought
was
 supposed to provide implementation details.

 Thanks again for all the comments and suggestions!

 Jiangjie (Becket) Qin

 On 3/10/15, 8:57 PM, Jiangjie Qin j...@linkedin.com wrote:

 The KIP page has been updated per Jay¹s comments.
 I¹d like to initiate the voting process if no further comments are
 received by tomorrow.
 
 Jiangjie (Becket) Qin
 
 On 3/8/15, 9:45 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Hey Jiangjie,
 
 Can you capture the full motivation and use cases for the feature?
This
 mentions your interest in having a way of aborting from inside the
 Callback. But it doesn't really explain that usage or why other people
 would want to do that. It also doesn't list the primary use case for
 having
 close with a bounded timeout which was to avoid blocking too long on
 shutdown.
 
 -Jay
 
 
 
 On Sat, Mar 7, 2015 at 12:25 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  Hi,
 
  I just created 

[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-03-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362111#comment-14362111
 ] 

Jun Rao commented on KAFKA-1926:


[~tongli], thanks the for patch. I made a pass of all methods in CoreUtils. I 
recommend that we do the following.

1. The following methods are general purpose and should be moved to 
o.a.k.common.utils.
readBytes
loadProps
stackTrace
daemonThread
newThread
croak

2. The following methods don't seem to be used and can be removed: 
daemonThread(runnable: Runnable)
newThread(runnable: Runnable, daemon: Boolean)
readUnsignedInt(buffer: ByteBuffer)
writetUnsignedInt(buffer: ByteBuffer, value: Long)
equal
groupby
notNull
nullOrEmpty
createFile
asString
readProps

3. readString(): The only non-test usage is in DefaultEventHandler. The usage 
is actually incorrect since the payload may not be convertible to a string. 
Instead of using readString(), we should just use message.message.toString. We 
should also change Message.toString to the same as in Record.toString. Then, we 
can move readString() to TestUtils.

4. hashCode(): The only place that it's used is in Broker. Broker is defined as 
a case class, which will do the right thing for equal() and hashCode(). So, we 
don't need to overwrite equal() and hashCode() there. After that, we can remove 
hashCode().

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
  Labels: newbie, patch
 Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1810) Add IP Filtering / Whitelists-Blacklists

2015-03-14 Thread Jeff Holoman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362217#comment-14362217
 ] 

Jeff Holoman commented on KAFKA-1810:
-

Updated reviewboard https://reviews.apache.org/r/29714/diff/
 against branch origin/trunk

 Add IP Filtering / Whitelists-Blacklists 
 -

 Key: KAFKA-1810
 URL: https://issues.apache.org/jira/browse/KAFKA-1810
 Project: Kafka
  Issue Type: New Feature
  Components: core, network, security
Reporter: Jeff Holoman
Assignee: Jeff Holoman
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-1810.patch, KAFKA-1810_2015-01-15_19:47:14.patch, 
 KAFKA-1810_2015-03-15_01:13:12.patch


 While longer-term goals of security in Kafka are on the roadmap there exists 
 some value for the ability to restrict connection to Kafka brokers based on 
 IP address. This is not intended as a replacement for security but more of a 
 precaution against misconfiguration and to provide some level of control to 
 Kafka administrators about who is reading/writing to their cluster.
 1) In some organizations software administration vs o/s systems 
 administration and network administration is disjointed and not well 
 choreographed. Providing software administrators the ability to configure 
 their platform relatively independently (after initial configuration) from 
 Systems administrators is desirable.
 2) Configuration and deployment is sometimes error prone and there are 
 situations when test environments could erroneously read/write to production 
 environments
 3) An additional precaution against reading sensitive data is typically 
 welcomed in most large enterprise deployments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29714: Patch for KAFKA-1810

2015-03-14 Thread Jeff Holoman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29714/
---

(Updated March 15, 2015, 5:13 a.m.)


Review request for kafka.


Bugs: KAFKA-1810
https://issues.apache.org/jira/browse/KAFKA-1810


Repository: kafka


Description (updated)
---

KAFKA-1810 ConfigDef Refactor


Diffs (updated)
-

  core/src/main/scala/kafka/network/IPFilter.scala PRE-CREATION 
  core/src/main/scala/kafka/network/SocketServer.scala 
76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/KafkaServer.scala 
dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/test/scala/unit/kafka/network/IpFilterTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 

Diff: https://reviews.apache.org/r/29714/diff/


Testing
---

This code centers around a new class, CIDRRange in IPFilter.scala. The IPFilter 
class is created and holds two fields, the ruleType (allow|deny|none) and a 
list of CIDRRange objects. This is used in the Socket Server acceptor thread. 
The check does an exists on the value in the list if the rule type is allow or 
deny. On object creation, we pre-calculate the lower and upper range values and 
store those as a BigInt. The overhead of the check should be fairly minimal as 
it involves converting the incoming IP Address to a BigInt and then just doing 
a compare to the low/high values. In writing this review up I realized that I 
can optimize this further to convert to bigint first and move that conversion 
out of the range check, which I can address.

Testing covers the CIDRRange and IPFilter classes and validation of IPV6, IPV4, 
and configurations. Additionally the functionality is tested in 
SocketServerTest. Other changes are just to assist in configuration.

I modified the SocketServerTest to use a method for grabbing the Socket server 
to make the code a bit more concise.

One key point is that, if there is an error in configuration, we halt the 
startup of the broker. The thinking there is that if you screw up 
security-related configs, you want to know about it right away rather than 
silently accept connections. (thanks Joe Stein for the input).

There are two new exceptions realted to this functionality, one to handle 
configuration errors, and one to handle blocking the request. Currently the 
level is set to INFO. Does it make sense to move this to WARN ?


Thanks,

Jeff Holoman



[jira] [Updated] (KAFKA-1810) Add IP Filtering / Whitelists-Blacklists

2015-03-14 Thread Jeff Holoman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Holoman updated KAFKA-1810:

Attachment: KAFKA-1810_2015-03-15_01:13:12.patch

 Add IP Filtering / Whitelists-Blacklists 
 -

 Key: KAFKA-1810
 URL: https://issues.apache.org/jira/browse/KAFKA-1810
 Project: Kafka
  Issue Type: New Feature
  Components: core, network, security
Reporter: Jeff Holoman
Assignee: Jeff Holoman
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-1810.patch, KAFKA-1810_2015-01-15_19:47:14.patch, 
 KAFKA-1810_2015-03-15_01:13:12.patch


 While longer-term goals of security in Kafka are on the roadmap there exists 
 some value for the ability to restrict connection to Kafka brokers based on 
 IP address. This is not intended as a replacement for security but more of a 
 precaution against misconfiguration and to provide some level of control to 
 Kafka administrators about who is reading/writing to their cluster.
 1) In some organizations software administration vs o/s systems 
 administration and network administration is disjointed and not well 
 choreographed. Providing software administrators the ability to configure 
 their platform relatively independently (after initial configuration) from 
 Systems administrators is desirable.
 2) Configuration and deployment is sometimes error prone and there are 
 situations when test environments could erroneously read/write to production 
 environments
 3) An additional precaution against reading sensitive data is typically 
 welcomed in most large enterprise deployments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29714: Patch for KAFKA-1810

2015-03-14 Thread Jeff Holoman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29714/
---

(Updated March 15, 2015, 5:26 a.m.)


Review request for kafka.


Bugs: KAFKA-1810
https://issues.apache.org/jira/browse/KAFKA-1810


Repository: kafka


Description
---

KAFKA-1810 ConfigDef Refactor


Diffs
-

  core/src/main/scala/kafka/network/IPFilter.scala PRE-CREATION 
  core/src/main/scala/kafka/network/SocketServer.scala 
76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/KafkaServer.scala 
dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/test/scala/unit/kafka/network/IpFilterTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 

Diff: https://reviews.apache.org/r/29714/diff/


Testing (updated)
---

This code centers around a new class, CIDRRange in IpFilter.scala. The IpFilter 
class is created and holds two fields, the ruleType (allow|deny|none) and a 
list of CIDRRange objects. This is used in the Socket Server acceptor thread. 
The check does an exists on the value in the list if the rule type is allow or 
deny. On object creation, we pre-calculate the lower and upper range values and 
store those as a BigInt. The overhead of the check should be fairly minimal as 
it involves converting the incoming IP Address to a BigInt and then just doing 
a compare to the low/high values.

Testing covers the CIDRRange and IPFilter classes and validation of IPV6, IPV4, 
and configurations. Additionally the functionality is tested in 
SocketServerTest. Other changes are just to assist in configuration.

I modified the SocketServerTest to use a method for grabbing the Socket server 
to make the code a bit more concise.

One key point is that, if there is an error in configuration, we halt the 
startup of the broker. The thinking there is that if you screw up 
security-related configs, you want to know about it right away rather than 
silently accept connections. (thanks Joe Stein for the input).

There is one new exceptions realted to this functionality, to handle blocking 
the request. Currently the level is set to WARN.

More details are in KIP-7
https://cwiki.apache.org/confluence/display/KAFKA/KIP-7+-+Security+-+IP+Filtering


Thanks,

Jeff Holoman



Re: Review Request 29714: Patch for KAFKA-1810

2015-03-14 Thread Jeff Holoman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29714/#review76500
---



core/src/main/scala/kafka/server/KafkaConfig.scala
https://reviews.apache.org/r/29714/#comment124033

Need to delete


- Jeff Holoman


On March 15, 2015, 5:13 a.m., Jeff Holoman wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/29714/
 ---
 
 (Updated March 15, 2015, 5:13 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1810
 https://issues.apache.org/jira/browse/KAFKA-1810
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1810 ConfigDef Refactor
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/IPFilter.scala PRE-CREATION 
   core/src/main/scala/kafka/network/SocketServer.scala 
 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 46d21c73f1feb3410751899380b35da0c37c975c 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 dddef938fabae157ed8644536eb1a2f329fb42b7 
   core/src/test/scala/unit/kafka/network/IpFilterTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
   core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
 7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 
 
 Diff: https://reviews.apache.org/r/29714/diff/
 
 
 Testing
 ---
 
 This code centers around a new class, CIDRRange in IPFilter.scala. The 
 IPFilter class is created and holds two fields, the ruleType 
 (allow|deny|none) and a list of CIDRRange objects. This is used in the Socket 
 Server acceptor thread. The check does an exists on the value in the list if 
 the rule type is allow or deny. On object creation, we pre-calculate the 
 lower and upper range values and store those as a BigInt. The overhead of the 
 check should be fairly minimal as it involves converting the incoming IP 
 Address to a BigInt and then just doing a compare to the low/high values. In 
 writing this review up I realized that I can optimize this further to convert 
 to bigint first and move that conversion out of the range check, which I can 
 address.
 
 Testing covers the CIDRRange and IPFilter classes and validation of IPV6, 
 IPV4, and configurations. Additionally the functionality is tested in 
 SocketServerTest. Other changes are just to assist in configuration.
 
 I modified the SocketServerTest to use a method for grabbing the Socket 
 server to make the code a bit more concise.
 
 One key point is that, if there is an error in configuration, we halt the 
 startup of the broker. The thinking there is that if you screw up 
 security-related configs, you want to know about it right away rather than 
 silently accept connections. (thanks Joe Stein for the input).
 
 There are two new exceptions realted to this functionality, one to handle 
 configuration errors, and one to handle blocking the request. Currently the 
 level is set to INFO. Does it make sense to move this to WARN ?
 
 
 Thanks,
 
 Jeff Holoman
 




[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2015-03-14 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361666#comment-14361666
 ] 

Jiangjie Qin commented on KAFKA-1305:
-

I see. The risk of this approach is that controller or broker could potentially 
be in a inconsistent state. Because it is not necessarily the case that timeout 
occurs on broker shutdown. In that case, some controller to broker messages are 
sent while some might not.
I think the key problem of current approach is that we mix the data plain and 
control plain, i.e. the controller message and user data are handled by same 
request handlers on Kafka server. So controller messages usually sitting in the 
queue behind many user requests. That could cause the handling of controller 
messages to delay for almost arbitrary time (the more leader a broker has, the 
worse the situation will be). The right solution is probably having a separate 
thread handling controller message or prioritize controller message handling. 
Giving priority to controller message probably has less change because we just 
need to insert the controller message to the head of the queue instead of the 
tail.

 Controller can hang on controlled shutdown with auto leader balance enabled
 ---

 Key: KAFKA-1305
 URL: https://issues.apache.org/jira/browse/KAFKA-1305
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Sriharsha Chintalapani
Priority: Blocker
 Fix For: 0.8.2.0, 0.9.0

 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
 KAFKA-1305_2014-10-13_07:30:45.patch


 This is relatively easy to reproduce especially when doing a rolling bounce.
 What happened here is as follows:
 1. The previous controller was bounced and broker 265 became the new 
 controller.
 2. I went on to do a controlled shutdown of broker 265 (the new controller).
 3. In the mean time the automatically scheduled preferred replica leader 
 election process started doing its thing and starts sending 
 LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
 (t@113 below).
 4. While that's happening, the controlled shutdown process on 265 succeeds 
 and proceeds to deregister itself from ZooKeeper and shuts down the socket 
 server.
 5. (ReplicaStateMachine actually removes deregistered brokers from the 
 controller channel manager's list of brokers to send requests to.  However, 
 that removal cannot take place (t@18 below) because preferred replica leader 
 election task owns the controller lock.)
 6. So the request thread to broker 265 gets into infinite retries.
 7. The entire broker shutdown process is blocked on controller shutdown for 
 the same reason (it needs to acquire the controller lock).
 Relevant portions from the thread-dump:
 Controller-265-to-broker-265-send-thread - Thread t@113
java.lang.Thread.State: TIMED_WAITING
   at java.lang.Thread.sleep(Native Method)
   at 
 kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
   at kafka.utils.Logging$class.swallow(Logging.scala:94)
   at kafka.utils.Utils$.swallow(Utils.scala:46)
   at 
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
   at 
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
   - locked java.lang.Object@6dbf14a7
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
Locked ownable synchronizers:
   - None
 ...
 Thread-4 - Thread t@17
java.lang.Thread.State: WAITING on 
 java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
 kafka-scheduler-0
   at sun.misc.Unsafe.park(Native Method)
   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
   at 
 java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
   at kafka.utils.Utils$.inLock(Utils.scala:536)
   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
   at 
 kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
   at kafka.utils.Utils$.swallow(Utils.scala:167)
   at 

Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-14 Thread Jiangjie Qin
Sounds reasonable. I have updated the KIP page accordingly.

On 3/12/15, 10:16 PM, Guozhang Wang wangg...@gmail.com wrote:

3) I think this is fine.
4) Hmm, error-message-only may NOT be better than blocking, as with the
code written with close(=0), it will likely to just pollute the logs with
repeating errors until someone gets notified. How about
error-message-and-close(-1), i.e. record the error and force shutdown?

On Thu, Mar 12, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Guozhang,

 Thanks for the comments. I updated the page as suggested.
 For 3), that’s right, I put this in java doc. Do you think we need to
 reject value other than -1?
 For 4), I think user will notice this easily because the thread will
block
 and producer is not going to shutdown. About using close(-1) quietly in
 sender thread when close() is called, my concern is that user might not
be
 aware of that. Maybe we can put an error messages if user call close()
in
 sender thread?

 Thanks,

 Jiangjie (Becket) Qin

 On 3/12/15, 5:13 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Becket,
 
 Some comments on the wiki page:
 
 1. There are a few typos, for example multivations, wiithin.
 
 2. I think the main motivation could just be current close() needs to
 block on flushing all buffered data, however there are scenarios in
which
 producers would like to close without blocking on flushing data, or
even
 close immediately and make sure any buffered data are dropped instead
of
 sending out. You can probably give some examples for such scenarios.
 
 3. close(-1, TimeUnit.MILLISECONDS) = from the implementation it seems
 any
 negative value has the same semantics.
 
 4. In sender thread only close(-1, TimeUnit.MILLISECONDS) should be
called
 = this is not programmatically enforced. Shall we just try to enforce
it
 via, for example, checking if caller thread is the IO thread, and if
yes
 just use close(-1)?
 
 5. Proposed Changes = it seems you only talked about the close(-1)
case,
 how about close(=0)?
 
 Guozhang
 
 On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
  Hey Joe  Jay,
 
  Thanks for the comments on the voting thread. Since it seems we
probably
  will have more discussion on this, I am just replying from the
 discussion
  thread here.
  I’ve updated the KIP page to make it less like half-baked, apologize
for
  the rush...
 
  The contract in current KIP is:
1. close() - wait until all requests either are sent or reach
request
  timeout.
2. close(-1, TimeUnit.MILLISECONDS) - close immediately
3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e.
Wait
  until all requests are sent or reach request timeout
4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending
 in 5
  milliseconds, if something went wrong, just shutdown the producer
 anyway,
  my callback will handle the failures.
 
  About how we define what timeout value stands for, I actually
struggled
 a
  little bit when wrote the patch. Intuitively, close(0) should mean
  immediately, however it seems that all the existing java class have
this
  convention of timeout=0 means no timeout or never timeout
 (Thread.join(0),
  Object.wait(0), etc.) So here the dilemma is either we follow the
  intuition or we follow the convention. What I chose is to follow the
  convention but document the interface to let user be aware of the
usage.
  The reason is that I think producer.close() is a public interface so
it
  might be better to follow java convention. Whereas selector is not a
  public interface that used by user, so as long as it makes sense to
us,
 it
  is less a problem to be different from java convention. That said
since
  consumer.poll(timeout) is also a public interface, I think it also
makes
  sense to make producer.close() to have the same definition of
  consumer.poll(timeout).
 
  The main argument for keeping a timeout in close would be separating
the
  close timeout from request timeout, which probably makes sense. I
would
  guess typically the request timeout would be long (e.g. 60 seconds)
  because we might want to consider retries with back off time. If we
have
  multiple batches in accumulator, in worst case that could take up to
  several minutes to complete all the requests. But when we close a
  producer, we might not want to wait for that long as it might cause
some
  other problem like deployment tool timeout.
 
  There is also a subtle difference between close(timeout) and
  flush(timeout). The only purpose for flush() is to write data to the
  broker, so it makes perfect sense to wait until request timeout. I
think
  that is why flush(timeout) looks strange. On the other hand, the top
  priority for close() is to close the producer rather than flush()
data,
 so
  close(timeout) gives guarantee on bounded waiting for its main job.
 
  Sorry for the confusion about forceClose flag. It is not a public
  interface. I mentioned it in Proposed Changes section 

[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs

2015-03-14 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361868#comment-14361868
 ] 

Jay Kreps commented on KAFKA-2020:
--

Is this the case where we are sending back an error code but ALSO sending back 
data and the error code doesn't mean that there is an error but just means hey 
I wanted to tell you something in the response so i filled in an error, 
hopefully you didn't handle that error like every single other error and throw 
an exception?

 I expect ReplicaNotAvailableException to have proper Javadocs
 -

 Key: KAFKA-2020
 URL: https://issues.apache.org/jira/browse/KAFKA-2020
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Chris Riccomini
Assignee: Neha Narkhede

 It looks like ReplicaNotAvailableException was copy and pasted from 
 LeaderNotAvailable exception. The Javadocs were never changed. This means 
 that users think that ReplicaNotAvailableException signifies leaders are not 
 available. This is very different from, I can ignore this exception, which 
 is what the Kafka protocol docs say to do with ReplicaNotAvailableException.
 Related: what's the point of ReplicaNotAvailableException if it's supposed to 
 be ignored?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-16: Replica Lag Tuning

2015-03-14 Thread Jay Kreps
+1

-Jay

On Fri, Mar 13, 2015 at 9:54 AM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Details in the KIP, Jira and RB.


 https://cwiki.apache.org/confluence/display/KAFKA/KIP+16+:+Automated+Replica+Lag+Tuning
 https://issues.apache.org/jira/browse/KAFKA-1546
 https://reviews.apache.org/r/31967/

 Aditya




[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs

2015-03-14 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361935#comment-14361935
 ] 

Ewen Cheslack-Postava commented on KAFKA-2020:
--

Yes, exactly.

 I expect ReplicaNotAvailableException to have proper Javadocs
 -

 Key: KAFKA-2020
 URL: https://issues.apache.org/jira/browse/KAFKA-2020
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Chris Riccomini
Assignee: Neha Narkhede

 It looks like ReplicaNotAvailableException was copy and pasted from 
 LeaderNotAvailable exception. The Javadocs were never changed. This means 
 that users think that ReplicaNotAvailableException signifies leaders are not 
 available. This is very different from, I can ignore this exception, which 
 is what the Kafka protocol docs say to do with ReplicaNotAvailableException.
 Related: what's the point of ReplicaNotAvailableException if it's supposed to 
 be ignored?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs

2015-03-14 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361952#comment-14361952
 ] 

Jay Kreps commented on KAFKA-2020:
--

Yeah we need to just fix that. I actually strongly suspect our own java clients 
have that bug, too right? Their exception handling has a set of know errors 
that get handled and then a catch-all throw for any other errors, which, in my 
opinion is the best way to do error handling in the clients. [~junrao], 
[~nehanarkhede] do you guys have any context on how we ended up with this?

 I expect ReplicaNotAvailableException to have proper Javadocs
 -

 Key: KAFKA-2020
 URL: https://issues.apache.org/jira/browse/KAFKA-2020
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Chris Riccomini
Assignee: Neha Narkhede

 It looks like ReplicaNotAvailableException was copy and pasted from 
 LeaderNotAvailable exception. The Javadocs were never changed. This means 
 that users think that ReplicaNotAvailableException signifies leaders are not 
 available. This is very different from, I can ignore this exception, which 
 is what the Kafka protocol docs say to do with ReplicaNotAvailableException.
 Related: what's the point of ReplicaNotAvailableException if it's supposed to 
 be ignored?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-14 Thread Jay Kreps
Hey Jiangjie,

I think this is going to be very confusing that
  close(0) waits indefinitely and
  close(-1) waits for 0.
I understand this appears in other apis, but it is a constant cause of
bugs. Let's not repeat that mistake.

Let's make close(0) wait for 0. We don't need a way to wait indefinitely as
we already have close() so having a magical constant for that is redundant.

Calling close() from the I/O thread was already possible and would block
indefinitely. I think trying to silently change the behavior is probably
not right. I.e. if the user calls close() in the callback there is actually
some misunderstanding and they need to think more, silently making this not
block will hide the problem from them which is the opposite of what we want.

-Jay

On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Joe  Jay,

 Thanks for the comments on the voting thread. Since it seems we probably
 will have more discussion on this, I am just replying from the discussion
 thread here.
 I’ve updated the KIP page to make it less like half-baked, apologize for
 the rush...

 The contract in current KIP is:
   1. close() - wait until all requests either are sent or reach request
 timeout.
   2. close(-1, TimeUnit.MILLISECONDS) - close immediately
   3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait
 until all requests are sent or reach request timeout
   4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending in 5
 milliseconds, if something went wrong, just shutdown the producer anyway,
 my callback will handle the failures.

 About how we define what timeout value stands for, I actually struggled a
 little bit when wrote the patch. Intuitively, close(0) should mean
 immediately, however it seems that all the existing java class have this
 convention of timeout=0 means no timeout or never timeout (Thread.join(0),
 Object.wait(0), etc.) So here the dilemma is either we follow the
 intuition or we follow the convention. What I chose is to follow the
 convention but document the interface to let user be aware of the usage.
 The reason is that I think producer.close() is a public interface so it
 might be better to follow java convention. Whereas selector is not a
 public interface that used by user, so as long as it makes sense to us, it
 is less a problem to be different from java convention. That said since
 consumer.poll(timeout) is also a public interface, I think it also makes
 sense to make producer.close() to have the same definition of
 consumer.poll(timeout).

 The main argument for keeping a timeout in close would be separating the
 close timeout from request timeout, which probably makes sense. I would
 guess typically the request timeout would be long (e.g. 60 seconds)
 because we might want to consider retries with back off time. If we have
 multiple batches in accumulator, in worst case that could take up to
 several minutes to complete all the requests. But when we close a
 producer, we might not want to wait for that long as it might cause some
 other problem like deployment tool timeout.

 There is also a subtle difference between close(timeout) and
 flush(timeout). The only purpose for flush() is to write data to the
 broker, so it makes perfect sense to wait until request timeout. I think
 that is why flush(timeout) looks strange. On the other hand, the top
 priority for close() is to close the producer rather than flush() data, so
 close(timeout) gives guarantee on bounded waiting for its main job.

 Sorry for the confusion about forceClose flag. It is not a public
 interface. I mentioned it in Proposed Changes section which I thought was
 supposed to provide implementation details.

 Thanks again for all the comments and suggestions!

 Jiangjie (Becket) Qin

 On 3/10/15, 8:57 PM, Jiangjie Qin j...@linkedin.com wrote:

 The KIP page has been updated per Jay¹s comments.
 I¹d like to initiate the voting process if no further comments are
 received by tomorrow.
 
 Jiangjie (Becket) Qin
 
 On 3/8/15, 9:45 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Hey Jiangjie,
 
 Can you capture the full motivation and use cases for the feature? This
 mentions your interest in having a way of aborting from inside the
 Callback. But it doesn't really explain that usage or why other people
 would want to do that. It also doesn't list the primary use case for
 having
 close with a bounded timeout which was to avoid blocking too long on
 shutdown.
 
 -Jay
 
 
 
 On Sat, Mar 7, 2015 at 12:25 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hi,
 
  I just created a KIP for adding a close(timeout) to new producer. Most
 of
  the previous discussions are in KAFKA-1660 where Parth Brahmbhatt has
  already done a lot of work.
  Since this is an interface change so we are going through the KIP
 process.
  Here is the KIP link:
 
 
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=5373978
 2
 
  Thanks.
 
  Jiangjie (Becket) Qin