[jira] [Commented] (KAFKA-1298) Controlled shutdown tool doesn't seem to work out of the box
[ https://issues.apache.org/jira/browse/KAFKA-1298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362047#comment-14362047 ] Grzegorz Dubicki commented on KAFKA-1298: - I think that the controlled shutdown tools docs here https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-1.ControlledShutdown should also be updated. I would do that myself but I don't have proper permissions in that Confluence. Controlled shutdown tool doesn't seem to work out of the box Key: KAFKA-1298 URL: https://issues.apache.org/jira/browse/KAFKA-1298 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.2.0 Attachments: KAFKA-1298.patch, KAFKA-1298.patch, KAFKA-1298_2014-06-09_17:20:44.patch Download Kafka and try to use our shutdown tool. Got this: bin/kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper localhost:2181 --broker 0 [2014-03-06 16:58:23,636] ERROR Operation failed due to controller failure (kafka.admin.ShutdownBroker$) java.io.IOException: Failed to retrieve RMIServer stub: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: jkreps-mn.linkedin.biz; nested exception is: java.net.ConnectException: Connection refused] at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:340) at javax.management.remote.JMXConnectorFactory.connect(JMXConnectorFactory.java:249) at kafka.admin.ShutdownBroker$.kafka$admin$ShutdownBroker$$invokeShutdown(ShutdownBroker.scala:56) at kafka.admin.ShutdownBroker$.main(ShutdownBroker.scala:109) at kafka.admin.ShutdownBroker.main(ShutdownBroker.scala) Caused by: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: jkreps-mn.linkedin.biz; nested exception is: java.net.ConnectException: Connection refused] at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:101) at com.sun.jndi.toolkit.url.GenericURLContext.lookup(GenericURLContext.java:185) at javax.naming.InitialContext.lookup(InitialContext.java:392) at javax.management.remote.rmi.RMIConnector.findRMIServerJNDI(RMIConnector.java:1888) at javax.management.remote.rmi.RMIConnector.findRMIServer(RMIConnector.java:1858) at javax.management.remote.rmi.RMIConnector.connect(RMIConnector.java:257) ... 4 more Caused by: java.rmi.ConnectException: Connection refused to host: jkreps-mn.linkedin.biz; nested exception is: java.net.ConnectException: Connection refused at sun.rmi.transport.tcp.TCPEndpoint.newSocket(TCPEndpoint.java:601) at sun.rmi.transport.tcp.TCPChannel.createConnection(TCPChannel.java:198) at sun.rmi.transport.tcp.TCPChannel.newConnection(TCPChannel.java:184) at sun.rmi.server.UnicastRef.newCall(UnicastRef.java:322) at sun.rmi.registry.RegistryImpl_Stub.lookup(Unknown Source) at com.sun.jndi.rmi.registry.RegistryContext.lookup(RegistryContext.java:97) ... 9 more Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:382) at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:241) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:228) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:431) at java.net.Socket.connect(Socket.java:527) at java.net.Socket.connect(Socket.java:476) at java.net.Socket.init(Socket.java:373) at java.net.Socket.init(Socket.java:187) at sun.rmi.transport.proxy.RMIDirectSocketFactory.createSocket(RMIDirectSocketFactory.java:22) at sun.rmi.transport.proxy.RMIMasterSocketFactory.createSocket(RMIMasterSocketFactory.java:128) at sun.rmi.transport.tcp.TCPEndpoint.newSocket(TCPEndpoint.java:595) ... 14 more Oh god, RMI?!!!??? Presumably this is because we stopped setting the JMX port by default. This is good because setting the JMX port breaks the quickstart which requires running multiple nodes on a single machine. The root cause imo is just using RMI here instead of our regular RPC. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Hi Jay, I have modified the KIP as you suggested. I thinks as long as we have consistent define for timeout across Kafka interface, there would be no problem. And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. Thanks. Jiangjie (Becket) Qin On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, I think this is going to be very confusing that close(0) waits indefinitely and close(-1) waits for 0. I understand this appears in other apis, but it is a constant cause of bugs. Let's not repeat that mistake. Let's make close(0) wait for 0. We don't need a way to wait indefinitely as we already have close() so having a magical constant for that is redundant. Calling close() from the I/O thread was already possible and would block indefinitely. I think trying to silently change the behavior is probably not right. I.e. if the user calls close() in the callback there is actually some misunderstanding and they need to think more, silently making this not block will hide the problem from them which is the opposite of what we want. -Jay On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Joe Jay, Thanks for the comments on the voting thread. Since it seems we probably will have more discussion on this, I am just replying from the discussion thread here. I’ve updated the KIP page to make it less like half-baked, apologize for the rush... The contract in current KIP is: 1. close() - wait until all requests either are sent or reach request timeout. 2. close(-1, TimeUnit.MILLISECONDS) - close immediately 3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait until all requests are sent or reach request timeout 4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending in 5 milliseconds, if something went wrong, just shutdown the producer anyway, my callback will handle the failures. About how we define what timeout value stands for, I actually struggled a little bit when wrote the patch. Intuitively, close(0) should mean immediately, however it seems that all the existing java class have this convention of timeout=0 means no timeout or never timeout (Thread.join(0), Object.wait(0), etc.) So here the dilemma is either we follow the intuition or we follow the convention. What I chose is to follow the convention but document the interface to let user be aware of the usage. The reason is that I think producer.close() is a public interface so it might be better to follow java convention. Whereas selector is not a public interface that used by user, so as long as it makes sense to us, it is less a problem to be different from java convention. That said since consumer.poll(timeout) is also a public interface, I think it also makes sense to make producer.close() to have the same definition of consumer.poll(timeout). The main argument for keeping a timeout in close would be separating the close timeout from request timeout, which probably makes sense. I would guess typically the request timeout would be long (e.g. 60 seconds) because we might want to consider retries with back off time. If we have multiple batches in accumulator, in worst case that could take up to several minutes to complete all the requests. But when we close a producer, we might not want to wait for that long as it might cause some other problem like deployment tool timeout. There is also a subtle difference between close(timeout) and flush(timeout). The only purpose for flush() is to write data to the broker, so it makes perfect sense to wait until request timeout. I think that is why flush(timeout) looks strange. On the other hand, the top priority for close() is to close the producer rather than flush() data, so close(timeout) gives guarantee on bounded waiting for its main job. Sorry for the confusion about forceClose flag. It is not a public interface. I mentioned it in Proposed Changes section which I thought was supposed to provide implementation details. Thanks again for all the comments and suggestions! Jiangjie (Becket) Qin On 3/10/15, 8:57 PM, Jiangjie Qin j...@linkedin.com wrote: The KIP page has been updated per Jay¹s comments. I¹d like to initiate the voting process if no further comments are received by tomorrow. Jiangjie (Becket) Qin On 3/8/15, 9:45 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, Can you capture the full motivation and use cases for the feature? This mentions your interest in having a way of aborting from inside the Callback. But it doesn't really explain that usage or why other people would want to do that. It also doesn't list the primary use case for having close with a bounded timeout which was to avoid blocking too long on shutdown. -Jay On Sat, Mar 7, 2015 at 12:25 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi, I just created
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362111#comment-14362111 ] Jun Rao commented on KAFKA-1926: [~tongli], thanks the for patch. I made a pass of all methods in CoreUtils. I recommend that we do the following. 1. The following methods are general purpose and should be moved to o.a.k.common.utils. readBytes loadProps stackTrace daemonThread newThread croak 2. The following methods don't seem to be used and can be removed: daemonThread(runnable: Runnable) newThread(runnable: Runnable, daemon: Boolean) readUnsignedInt(buffer: ByteBuffer) writetUnsignedInt(buffer: ByteBuffer, value: Long) equal groupby notNull nullOrEmpty createFile asString readProps 3. readString(): The only non-test usage is in DefaultEventHandler. The usage is actually incorrect since the payload may not be convertible to a string. Instead of using readString(), we should just use message.message.toString. We should also change Message.toString to the same as in Record.toString. Then, we can move readString() to TestUtils. 4. hashCode(): The only place that it's used is in Broker. Broker is defined as a case class, which will do the right thing for equal() and hashCode(). So, we don't need to overwrite equal() and hashCode() there. After that, we can remove hashCode(). Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Labels: newbie, patch Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1810) Add IP Filtering / Whitelists-Blacklists
[ https://issues.apache.org/jira/browse/KAFKA-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362217#comment-14362217 ] Jeff Holoman commented on KAFKA-1810: - Updated reviewboard https://reviews.apache.org/r/29714/diff/ against branch origin/trunk Add IP Filtering / Whitelists-Blacklists - Key: KAFKA-1810 URL: https://issues.apache.org/jira/browse/KAFKA-1810 Project: Kafka Issue Type: New Feature Components: core, network, security Reporter: Jeff Holoman Assignee: Jeff Holoman Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-1810.patch, KAFKA-1810_2015-01-15_19:47:14.patch, KAFKA-1810_2015-03-15_01:13:12.patch While longer-term goals of security in Kafka are on the roadmap there exists some value for the ability to restrict connection to Kafka brokers based on IP address. This is not intended as a replacement for security but more of a precaution against misconfiguration and to provide some level of control to Kafka administrators about who is reading/writing to their cluster. 1) In some organizations software administration vs o/s systems administration and network administration is disjointed and not well choreographed. Providing software administrators the ability to configure their platform relatively independently (after initial configuration) from Systems administrators is desirable. 2) Configuration and deployment is sometimes error prone and there are situations when test environments could erroneously read/write to production environments 3) An additional precaution against reading sensitive data is typically welcomed in most large enterprise deployments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29714: Patch for KAFKA-1810
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29714/ --- (Updated March 15, 2015, 5:13 a.m.) Review request for kafka. Bugs: KAFKA-1810 https://issues.apache.org/jira/browse/KAFKA-1810 Repository: kafka Description (updated) --- KAFKA-1810 ConfigDef Refactor Diffs (updated) - core/src/main/scala/kafka/network/IPFilter.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/test/scala/unit/kafka/network/IpFilterTest.scala PRE-CREATION core/src/test/scala/unit/kafka/network/SocketServerTest.scala 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 Diff: https://reviews.apache.org/r/29714/diff/ Testing --- This code centers around a new class, CIDRRange in IPFilter.scala. The IPFilter class is created and holds two fields, the ruleType (allow|deny|none) and a list of CIDRRange objects. This is used in the Socket Server acceptor thread. The check does an exists on the value in the list if the rule type is allow or deny. On object creation, we pre-calculate the lower and upper range values and store those as a BigInt. The overhead of the check should be fairly minimal as it involves converting the incoming IP Address to a BigInt and then just doing a compare to the low/high values. In writing this review up I realized that I can optimize this further to convert to bigint first and move that conversion out of the range check, which I can address. Testing covers the CIDRRange and IPFilter classes and validation of IPV6, IPV4, and configurations. Additionally the functionality is tested in SocketServerTest. Other changes are just to assist in configuration. I modified the SocketServerTest to use a method for grabbing the Socket server to make the code a bit more concise. One key point is that, if there is an error in configuration, we halt the startup of the broker. The thinking there is that if you screw up security-related configs, you want to know about it right away rather than silently accept connections. (thanks Joe Stein for the input). There are two new exceptions realted to this functionality, one to handle configuration errors, and one to handle blocking the request. Currently the level is set to INFO. Does it make sense to move this to WARN ? Thanks, Jeff Holoman
[jira] [Updated] (KAFKA-1810) Add IP Filtering / Whitelists-Blacklists
[ https://issues.apache.org/jira/browse/KAFKA-1810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Holoman updated KAFKA-1810: Attachment: KAFKA-1810_2015-03-15_01:13:12.patch Add IP Filtering / Whitelists-Blacklists - Key: KAFKA-1810 URL: https://issues.apache.org/jira/browse/KAFKA-1810 Project: Kafka Issue Type: New Feature Components: core, network, security Reporter: Jeff Holoman Assignee: Jeff Holoman Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-1810.patch, KAFKA-1810_2015-01-15_19:47:14.patch, KAFKA-1810_2015-03-15_01:13:12.patch While longer-term goals of security in Kafka are on the roadmap there exists some value for the ability to restrict connection to Kafka brokers based on IP address. This is not intended as a replacement for security but more of a precaution against misconfiguration and to provide some level of control to Kafka administrators about who is reading/writing to their cluster. 1) In some organizations software administration vs o/s systems administration and network administration is disjointed and not well choreographed. Providing software administrators the ability to configure their platform relatively independently (after initial configuration) from Systems administrators is desirable. 2) Configuration and deployment is sometimes error prone and there are situations when test environments could erroneously read/write to production environments 3) An additional precaution against reading sensitive data is typically welcomed in most large enterprise deployments. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29714: Patch for KAFKA-1810
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29714/ --- (Updated March 15, 2015, 5:26 a.m.) Review request for kafka. Bugs: KAFKA-1810 https://issues.apache.org/jira/browse/KAFKA-1810 Repository: kafka Description --- KAFKA-1810 ConfigDef Refactor Diffs - core/src/main/scala/kafka/network/IPFilter.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/test/scala/unit/kafka/network/IpFilterTest.scala PRE-CREATION core/src/test/scala/unit/kafka/network/SocketServerTest.scala 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 Diff: https://reviews.apache.org/r/29714/diff/ Testing (updated) --- This code centers around a new class, CIDRRange in IpFilter.scala. The IpFilter class is created and holds two fields, the ruleType (allow|deny|none) and a list of CIDRRange objects. This is used in the Socket Server acceptor thread. The check does an exists on the value in the list if the rule type is allow or deny. On object creation, we pre-calculate the lower and upper range values and store those as a BigInt. The overhead of the check should be fairly minimal as it involves converting the incoming IP Address to a BigInt and then just doing a compare to the low/high values. Testing covers the CIDRRange and IPFilter classes and validation of IPV6, IPV4, and configurations. Additionally the functionality is tested in SocketServerTest. Other changes are just to assist in configuration. I modified the SocketServerTest to use a method for grabbing the Socket server to make the code a bit more concise. One key point is that, if there is an error in configuration, we halt the startup of the broker. The thinking there is that if you screw up security-related configs, you want to know about it right away rather than silently accept connections. (thanks Joe Stein for the input). There is one new exceptions realted to this functionality, to handle blocking the request. Currently the level is set to WARN. More details are in KIP-7 https://cwiki.apache.org/confluence/display/KAFKA/KIP-7+-+Security+-+IP+Filtering Thanks, Jeff Holoman
Re: Review Request 29714: Patch for KAFKA-1810
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29714/#review76500 --- core/src/main/scala/kafka/server/KafkaConfig.scala https://reviews.apache.org/r/29714/#comment124033 Need to delete - Jeff Holoman On March 15, 2015, 5:13 a.m., Jeff Holoman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29714/ --- (Updated March 15, 2015, 5:13 a.m.) Review request for kafka. Bugs: KAFKA-1810 https://issues.apache.org/jira/browse/KAFKA-1810 Repository: kafka Description --- KAFKA-1810 ConfigDef Refactor Diffs - core/src/main/scala/kafka/network/IPFilter.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/test/scala/unit/kafka/network/IpFilterTest.scala PRE-CREATION core/src/test/scala/unit/kafka/network/SocketServerTest.scala 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 7f47e6f9a74314ed9e9f19d0c97931f3f2e49259 Diff: https://reviews.apache.org/r/29714/diff/ Testing --- This code centers around a new class, CIDRRange in IPFilter.scala. The IPFilter class is created and holds two fields, the ruleType (allow|deny|none) and a list of CIDRRange objects. This is used in the Socket Server acceptor thread. The check does an exists on the value in the list if the rule type is allow or deny. On object creation, we pre-calculate the lower and upper range values and store those as a BigInt. The overhead of the check should be fairly minimal as it involves converting the incoming IP Address to a BigInt and then just doing a compare to the low/high values. In writing this review up I realized that I can optimize this further to convert to bigint first and move that conversion out of the range check, which I can address. Testing covers the CIDRRange and IPFilter classes and validation of IPV6, IPV4, and configurations. Additionally the functionality is tested in SocketServerTest. Other changes are just to assist in configuration. I modified the SocketServerTest to use a method for grabbing the Socket server to make the code a bit more concise. One key point is that, if there is an error in configuration, we halt the startup of the broker. The thinking there is that if you screw up security-related configs, you want to know about it right away rather than silently accept connections. (thanks Joe Stein for the input). There are two new exceptions realted to this functionality, one to handle configuration errors, and one to handle blocking the request. Currently the level is set to INFO. Does it make sense to move this to WARN ? Thanks, Jeff Holoman
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361666#comment-14361666 ] Jiangjie Qin commented on KAFKA-1305: - I see. The risk of this approach is that controller or broker could potentially be in a inconsistent state. Because it is not necessarily the case that timeout occurs on broker shutdown. In that case, some controller to broker messages are sent while some might not. I think the key problem of current approach is that we mix the data plain and control plain, i.e. the controller message and user data are handled by same request handlers on Kafka server. So controller messages usually sitting in the queue behind many user requests. That could cause the handling of controller messages to delay for almost arbitrary time (the more leader a broker has, the worse the situation will be). The right solution is probably having a separate thread handling controller message or prioritize controller message handling. Giving priority to controller message probably has less change because we just need to insert the controller message to the head of the queue instead of the tail. Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2.0, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, KAFKA-1305_2014-10-13_07:30:45.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Sounds reasonable. I have updated the KIP page accordingly. On 3/12/15, 10:16 PM, Guozhang Wang wangg...@gmail.com wrote: 3) I think this is fine. 4) Hmm, error-message-only may NOT be better than blocking, as with the code written with close(=0), it will likely to just pollute the logs with repeating errors until someone gets notified. How about error-message-and-close(-1), i.e. record the error and force shutdown? On Thu, Mar 12, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Guozhang, Thanks for the comments. I updated the page as suggested. For 3), that’s right, I put this in java doc. Do you think we need to reject value other than -1? For 4), I think user will notice this easily because the thread will block and producer is not going to shutdown. About using close(-1) quietly in sender thread when close() is called, my concern is that user might not be aware of that. Maybe we can put an error messages if user call close() in sender thread? Thanks, Jiangjie (Becket) Qin On 3/12/15, 5:13 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Becket, Some comments on the wiki page: 1. There are a few typos, for example multivations, wiithin. 2. I think the main motivation could just be current close() needs to block on flushing all buffered data, however there are scenarios in which producers would like to close without blocking on flushing data, or even close immediately and make sure any buffered data are dropped instead of sending out. You can probably give some examples for such scenarios. 3. close(-1, TimeUnit.MILLISECONDS) = from the implementation it seems any negative value has the same semantics. 4. In sender thread only close(-1, TimeUnit.MILLISECONDS) should be called = this is not programmatically enforced. Shall we just try to enforce it via, for example, checking if caller thread is the IO thread, and if yes just use close(-1)? 5. Proposed Changes = it seems you only talked about the close(-1) case, how about close(=0)? Guozhang On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Joe Jay, Thanks for the comments on the voting thread. Since it seems we probably will have more discussion on this, I am just replying from the discussion thread here. I’ve updated the KIP page to make it less like half-baked, apologize for the rush... The contract in current KIP is: 1. close() - wait until all requests either are sent or reach request timeout. 2. close(-1, TimeUnit.MILLISECONDS) - close immediately 3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait until all requests are sent or reach request timeout 4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending in 5 milliseconds, if something went wrong, just shutdown the producer anyway, my callback will handle the failures. About how we define what timeout value stands for, I actually struggled a little bit when wrote the patch. Intuitively, close(0) should mean immediately, however it seems that all the existing java class have this convention of timeout=0 means no timeout or never timeout (Thread.join(0), Object.wait(0), etc.) So here the dilemma is either we follow the intuition or we follow the convention. What I chose is to follow the convention but document the interface to let user be aware of the usage. The reason is that I think producer.close() is a public interface so it might be better to follow java convention. Whereas selector is not a public interface that used by user, so as long as it makes sense to us, it is less a problem to be different from java convention. That said since consumer.poll(timeout) is also a public interface, I think it also makes sense to make producer.close() to have the same definition of consumer.poll(timeout). The main argument for keeping a timeout in close would be separating the close timeout from request timeout, which probably makes sense. I would guess typically the request timeout would be long (e.g. 60 seconds) because we might want to consider retries with back off time. If we have multiple batches in accumulator, in worst case that could take up to several minutes to complete all the requests. But when we close a producer, we might not want to wait for that long as it might cause some other problem like deployment tool timeout. There is also a subtle difference between close(timeout) and flush(timeout). The only purpose for flush() is to write data to the broker, so it makes perfect sense to wait until request timeout. I think that is why flush(timeout) looks strange. On the other hand, the top priority for close() is to close the producer rather than flush() data, so close(timeout) gives guarantee on bounded waiting for its main job. Sorry for the confusion about forceClose flag. It is not a public interface. I mentioned it in Proposed Changes section
[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs
[ https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361868#comment-14361868 ] Jay Kreps commented on KAFKA-2020: -- Is this the case where we are sending back an error code but ALSO sending back data and the error code doesn't mean that there is an error but just means hey I wanted to tell you something in the response so i filled in an error, hopefully you didn't handle that error like every single other error and throw an exception? I expect ReplicaNotAvailableException to have proper Javadocs - Key: KAFKA-2020 URL: https://issues.apache.org/jira/browse/KAFKA-2020 Project: Kafka Issue Type: Bug Components: consumer Reporter: Chris Riccomini Assignee: Neha Narkhede It looks like ReplicaNotAvailableException was copy and pasted from LeaderNotAvailable exception. The Javadocs were never changed. This means that users think that ReplicaNotAvailableException signifies leaders are not available. This is very different from, I can ignore this exception, which is what the Kafka protocol docs say to do with ReplicaNotAvailableException. Related: what's the point of ReplicaNotAvailableException if it's supposed to be ignored? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-16: Replica Lag Tuning
+1 -Jay On Fri, Mar 13, 2015 at 9:54 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Details in the KIP, Jira and RB. https://cwiki.apache.org/confluence/display/KAFKA/KIP+16+:+Automated+Replica+Lag+Tuning https://issues.apache.org/jira/browse/KAFKA-1546 https://reviews.apache.org/r/31967/ Aditya
[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs
[ https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361935#comment-14361935 ] Ewen Cheslack-Postava commented on KAFKA-2020: -- Yes, exactly. I expect ReplicaNotAvailableException to have proper Javadocs - Key: KAFKA-2020 URL: https://issues.apache.org/jira/browse/KAFKA-2020 Project: Kafka Issue Type: Bug Components: consumer Reporter: Chris Riccomini Assignee: Neha Narkhede It looks like ReplicaNotAvailableException was copy and pasted from LeaderNotAvailable exception. The Javadocs were never changed. This means that users think that ReplicaNotAvailableException signifies leaders are not available. This is very different from, I can ignore this exception, which is what the Kafka protocol docs say to do with ReplicaNotAvailableException. Related: what's the point of ReplicaNotAvailableException if it's supposed to be ignored? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs
[ https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14361952#comment-14361952 ] Jay Kreps commented on KAFKA-2020: -- Yeah we need to just fix that. I actually strongly suspect our own java clients have that bug, too right? Their exception handling has a set of know errors that get handled and then a catch-all throw for any other errors, which, in my opinion is the best way to do error handling in the clients. [~junrao], [~nehanarkhede] do you guys have any context on how we ended up with this? I expect ReplicaNotAvailableException to have proper Javadocs - Key: KAFKA-2020 URL: https://issues.apache.org/jira/browse/KAFKA-2020 Project: Kafka Issue Type: Bug Components: consumer Reporter: Chris Riccomini Assignee: Neha Narkhede It looks like ReplicaNotAvailableException was copy and pasted from LeaderNotAvailable exception. The Javadocs were never changed. This means that users think that ReplicaNotAvailableException signifies leaders are not available. This is very different from, I can ignore this exception, which is what the Kafka protocol docs say to do with ReplicaNotAvailableException. Related: what's the point of ReplicaNotAvailableException if it's supposed to be ignored? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Hey Jiangjie, I think this is going to be very confusing that close(0) waits indefinitely and close(-1) waits for 0. I understand this appears in other apis, but it is a constant cause of bugs. Let's not repeat that mistake. Let's make close(0) wait for 0. We don't need a way to wait indefinitely as we already have close() so having a magical constant for that is redundant. Calling close() from the I/O thread was already possible and would block indefinitely. I think trying to silently change the behavior is probably not right. I.e. if the user calls close() in the callback there is actually some misunderstanding and they need to think more, silently making this not block will hide the problem from them which is the opposite of what we want. -Jay On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Joe Jay, Thanks for the comments on the voting thread. Since it seems we probably will have more discussion on this, I am just replying from the discussion thread here. I’ve updated the KIP page to make it less like half-baked, apologize for the rush... The contract in current KIP is: 1. close() - wait until all requests either are sent or reach request timeout. 2. close(-1, TimeUnit.MILLISECONDS) - close immediately 3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait until all requests are sent or reach request timeout 4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending in 5 milliseconds, if something went wrong, just shutdown the producer anyway, my callback will handle the failures. About how we define what timeout value stands for, I actually struggled a little bit when wrote the patch. Intuitively, close(0) should mean immediately, however it seems that all the existing java class have this convention of timeout=0 means no timeout or never timeout (Thread.join(0), Object.wait(0), etc.) So here the dilemma is either we follow the intuition or we follow the convention. What I chose is to follow the convention but document the interface to let user be aware of the usage. The reason is that I think producer.close() is a public interface so it might be better to follow java convention. Whereas selector is not a public interface that used by user, so as long as it makes sense to us, it is less a problem to be different from java convention. That said since consumer.poll(timeout) is also a public interface, I think it also makes sense to make producer.close() to have the same definition of consumer.poll(timeout). The main argument for keeping a timeout in close would be separating the close timeout from request timeout, which probably makes sense. I would guess typically the request timeout would be long (e.g. 60 seconds) because we might want to consider retries with back off time. If we have multiple batches in accumulator, in worst case that could take up to several minutes to complete all the requests. But when we close a producer, we might not want to wait for that long as it might cause some other problem like deployment tool timeout. There is also a subtle difference between close(timeout) and flush(timeout). The only purpose for flush() is to write data to the broker, so it makes perfect sense to wait until request timeout. I think that is why flush(timeout) looks strange. On the other hand, the top priority for close() is to close the producer rather than flush() data, so close(timeout) gives guarantee on bounded waiting for its main job. Sorry for the confusion about forceClose flag. It is not a public interface. I mentioned it in Proposed Changes section which I thought was supposed to provide implementation details. Thanks again for all the comments and suggestions! Jiangjie (Becket) Qin On 3/10/15, 8:57 PM, Jiangjie Qin j...@linkedin.com wrote: The KIP page has been updated per Jay¹s comments. I¹d like to initiate the voting process if no further comments are received by tomorrow. Jiangjie (Becket) Qin On 3/8/15, 9:45 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, Can you capture the full motivation and use cases for the feature? This mentions your interest in having a way of aborting from inside the Callback. But it doesn't really explain that usage or why other people would want to do that. It also doesn't list the primary use case for having close with a bounded timeout which was to avoid blocking too long on shutdown. -Jay On Sat, Mar 7, 2015 at 12:25 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi, I just created a KIP for adding a close(timeout) to new producer. Most of the previous discussions are in KAFKA-1660 where Parth Brahmbhatt has already done a lot of work. Since this is an interface change so we are going through the KIP process. Here is the KIP link: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=5373978 2 Thanks. Jiangjie (Becket) Qin