[jira] [Created] (KAFKA-1966) Cannot read just created index
Igor Artamonov created KAFKA-1966: - Summary: Cannot read just created index Key: KAFKA-1966 URL: https://issues.apache.org/jira/browse/KAFKA-1966 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.0 Environment: Docker FROM dockerfile/ubuntu Oracle Java 1.7.0_72 Kafka Scala 2.11 Reporter: Igor Artamonov Priority: Critical Created a fresh {{test}} topic by using {{kafka-console-producer.sh}}, but Kafka fails to process this topic. Even cannot read the index: {code} kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka-logs/test-0/.index --deep-iteration Dumping /kafka-logs/test-0/.index Exception in thread main java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:874) at kafka.log.OffsetIndex.init(OffsetIndex.scala:74) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-8: Add a flush() method to the new producer
+1 On 2/19/15 12:08 PM, Neha Narkhede n...@confluent.io wrote: +1 (binding) On Thu, Feb 19, 2015 at 6:29 AM, Joel Koshy jjkosh...@gmail.com wrote: +1 (binding) On Wed, Feb 18, 2015 at 07:03:26PM -0500, Joe Stein wrote: +1 binding ~ Joestein On Feb 18, 2015 6:50 PM, Jay Kreps j...@confluent.io wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+met hod+to+the+producer+API +1 binding -Jay -- Thanks, Neha
Re: [VOTE] KIP-8: Add a flush() method to the new producer
+1 (binding) On Thu, Feb 19, 2015 at 6:29 AM, Joel Koshy jjkosh...@gmail.com wrote: +1 (binding) On Wed, Feb 18, 2015 at 07:03:26PM -0500, Joe Stein wrote: +1 binding ~ Joestein On Feb 18, 2015 6:50 PM, Jay Kreps j...@confluent.io wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API +1 binding -Jay -- Thanks, Neha
Re: Review Request 31140: Patch for KAFKA-1953
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31140/#review73176 --- Ship it! Ship It! - Guozhang Wang On Feb. 18, 2015, 2:23 a.m., Joel Koshy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31140/ --- (Updated Feb. 18, 2015, 2:23 a.m.) Review request for kafka. Bugs: KAFKA-1953 https://issues.apache.org/jira/browse/KAFKA-1953 Repository: kafka Description --- KAFKA-1953; KAFKA-1962; Disambiguate purgatory metrics; restore delayed request metrics Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 01cf1d91b7056bea7368ae4ea1e3c3646fc33619 core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 894d6edb4077cae081b9d4039353dd17e6f0c18f core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala 445bfa1bf8840620e10de2456875716dc66e789a core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b3b3749a21d35950a975e24dd9d1d53afbfaaee4 core/src/main/scala/kafka/server/DelayedFetch.scala dd602ee2e65c2cd4ec363c75fa5d0b3c038b1ed2 core/src/main/scala/kafka/server/DelayedOperation.scala fc06b01cad3a0497800df727fa2abf60772694f2 core/src/main/scala/kafka/server/DelayedProduce.scala c229088eb4f3db414225a688e149591ae0f810e7 core/src/main/scala/kafka/server/ReplicaManager.scala b82ff55e1dd1fe3fee2de5ab4bbddc91b0146601 core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 93f52d3222fc10b6d22ef6278365f6b026180418 Diff: https://reviews.apache.org/r/31140/diff/ Testing --- Thanks, Joel Koshy
[jira] [Commented] (KAFKA-1929) Convert core kafka module to use the errors in org.apache.kafka.common.errors
[ https://issues.apache.org/jira/browse/KAFKA-1929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327974#comment-14327974 ] Jeff Holoman commented on KAFKA-1929: - Here's the list of duplicated errors InvalidTopicException LeaderNotAvailable - Used in scala producer NotEnoughReplicasException NotEnoughReplicasAfterAppendException NotLeaderForPartitionException OffsetMetadataTooLarge (Renaming to OffsetMetadataTooLargeException) OffsetOutOfRangeException UnkownTopicOrPartitionException - Also in scala producer In most cases removing these from core and replacing the ErrorMapping with the error from o.a.k is an easy fix, the only real difference being that the errors in o.a.k present a different exception hierarchy. All errors Extend RuntimeException - KafkaException: OffsetMetadataTooLarge | 12 NotEnoughReplicasException | 19 ApiException - InvalidTopicException | 17 ApiException - NotEnoughReplicasAfterAppendException | 20 ApiException - RetriableException - OffsetOutOfRange | 1 ApiException - RetriableException - UnkownTopicOrPartitionException | 3 ApiException - RetriableException - InvalidMetadataException - LeaderNotAvailableException | 5 ApiException - RetriableException - InvalidMetadataException - NotLeaderForPartitionException | 6 [~jkreps] you mentioned leaving the Scala clients as is, how do you want to handle UnknownTopicOrPartitionException and LeaderNotAvailable which are in a number of different places in core? Additionally, I noticed that OFFSET_LOAD_IN_PROGRESS, CONSUMER_COORDINATOR_NOT_AVAILABLE, and NOT_COORDINATOR_FOR_CONSUMER (which map to 14,15,16 in the protocol) are not mapped in Errors.java to named exceptions like their counterparts, instead implemented as ApiException. Is it worth implementing classes for these for consistency? Thanks Jeff Convert core kafka module to use the errors in org.apache.kafka.common.errors - Key: KAFKA-1929 URL: https://issues.apache.org/jira/browse/KAFKA-1929 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Jeff Holoman With the introduction of the common package there are now a lot of errors duplicated in both the common package and in the server. We should refactor the server code (but not the scala clients) to switch over to the exceptions in common. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1966) Cannot read just created index
[ https://issues.apache.org/jira/browse/KAFKA-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327320#comment-14327320 ] Igor Artamonov commented on KAFKA-1966: --- Seems that it's a conflict with Docker mounted volumes. When I put logs data into shared volume I get this errors (and console-producer/consumer throws errors also). But if I use a local dir, inside current docker, it works fine, can produce from console on one vm, and receive from another w/o problem. Docker 1.5, under Boot2docker and Mac OS Cannot read just created index -- Key: KAFKA-1966 URL: https://issues.apache.org/jira/browse/KAFKA-1966 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.0 Environment: Docker FROM dockerfile/ubuntu Oracle Java 1.7.0_72 Kafka Scala 2.11 Kafka 0.8.2.0 Reporter: Igor Artamonov Priority: Critical Created a fresh {{test}} topic by using {{kafka-console-producer.sh}}, but Kafka fails to process this topic. Even cannot read the index: {code} kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka-logs/test-0/.index --deep-iteration Dumping /kafka-logs/test-0/.index Exception in thread main java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:874) at kafka.log.OffsetIndex.init(OffsetIndex.scala:74) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1966) Cannot read just created index
[ https://issues.apache.org/jira/browse/KAFKA-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327253#comment-14327253 ] Igor Artamonov commented on KAFKA-1966: --- Same for 0.8.2.0 + scala 2.10, and 0.8.1.1 + scala 2.9.2. So it could be an incompatibility with Docker Cannot read just created index -- Key: KAFKA-1966 URL: https://issues.apache.org/jira/browse/KAFKA-1966 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.0 Environment: Docker FROM dockerfile/ubuntu Oracle Java 1.7.0_72 Kafka Scala 2.11 Kafka 0.8.2.0 Reporter: Igor Artamonov Priority: Critical Created a fresh {{test}} topic by using {{kafka-console-producer.sh}}, but Kafka fails to process this topic. Even cannot read the index: {code} kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka-logs/test-0/.index --deep-iteration Dumping /kafka-logs/test-0/.index Exception in thread main java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:874) at kafka.log.OffsetIndex.init(OffsetIndex.scala:74) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1967) Support more flexible serialization in Log4jAppender
[ https://issues.apache.org/jira/browse/KAFKA-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jesse Yates updated KAFKA-1967: --- Attachment: kafka-1967-trunk.patch Attaching initial patch that just lets subclasses override subAppend and return bytes. Support more flexible serialization in Log4jAppender Key: KAFKA-1967 URL: https://issues.apache.org/jira/browse/KAFKA-1967 Project: Kafka Issue Type: Improvement Reporter: Jesse Yates Priority: Minor Fix For: 0.8.3 Attachments: kafka-1967-trunk.patch It would be nice to allow subclasses of the standard KafkfaLog4jAppender to be able to serialize the LoggingEvent however they chose, rather than always having to write out a string. A possible use case - the one I'm interested in - allows implementors to convert the event to any sort of bytes. This means downstream consumers don't lose data based on the logging format, but instead can get the entire event to do with as they please -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31199: Patch for KAFKA-1965
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31199/#review73168 --- Ship it! minor comment. core/src/main/scala/kafka/utils/DelayedItem.scala https://reviews.apache.org/r/31199/#comment119405 May be better to name the argument delayMs - Joel Koshy On Feb. 19, 2015, 5:51 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31199/ --- (Updated Feb. 19, 2015, 5:51 p.m.) Review request for kafka. Bugs: KAFKA-1965 https://issues.apache.org/jira/browse/KAFKA-1965 Repository: kafka Description --- leaner DelayedItem Diffs - core/src/main/scala/kafka/utils/DelayedItem.scala a4e0dabc858bc0081ba4fc0deea203bebd8bbf6b Diff: https://reviews.apache.org/r/31199/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Created] (KAFKA-1967) Support more flexible serialization in Log4jAppender
Jesse Yates created KAFKA-1967: -- Summary: Support more flexible serialization in Log4jAppender Key: KAFKA-1967 URL: https://issues.apache.org/jira/browse/KAFKA-1967 Project: Kafka Issue Type: Improvement Reporter: Jesse Yates Fix For: 0.8.3 It would be nice to allow subclasses of the standard KafkfaLog4jAppender to be able to serialize the LoggingEvent however they chose, rather than always having to write out a string. A possible use case - the one I'm interested in - allows implementors to convert the event to any sort of bytes. This means downstream consumers don't lose data based on the logging format, but instead can get the entire event to do with as they please -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1967) Support more flexible serialization in Log4jAppender
[ https://issues.apache.org/jira/browse/KAFKA-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jesse Yates updated KAFKA-1967: --- Priority: Minor (was: Major) Support more flexible serialization in Log4jAppender Key: KAFKA-1967 URL: https://issues.apache.org/jira/browse/KAFKA-1967 Project: Kafka Issue Type: Improvement Reporter: Jesse Yates Priority: Minor Fix For: 0.8.3 It would be nice to allow subclasses of the standard KafkfaLog4jAppender to be able to serialize the LoggingEvent however they chose, rather than always having to write out a string. A possible use case - the one I'm interested in - allows implementors to convert the event to any sort of bytes. This means downstream consumers don't lose data based on the logging format, but instead can get the entire event to do with as they please -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1615) Generating group ID in ZookeeperConsumerConnector shouldn't require local hostname to resolve
[ https://issues.apache.org/jira/browse/KAFKA-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328168#comment-14328168 ] Jonathan Rafalski commented on KAFKA-1615: -- hello [~gwenshap], now that I have the source up and running I took a look at this, and correct me if I am wrong here but if we want to get the IP and we don't have a hostname I will need to use networkinterfaces and iterate through to find the first interface with a non loopback IP. Using that method, once I find the IP the networkinterface class is going to return an inetaddress which could be an IPv6 or IPv4 depending on what comes back first. if for whatever reason nothing is found (aka if the only nic went down) the hostname will default to localhost (though being that we already have checked the nic connection to zookeeper i don't think this case will ever happen). seems like a very thick process just to generate an Id, so I wanted to check before making the change. What do you think? Generating group ID in ZookeeperConsumerConnector shouldn't require local hostname to resolve - Key: KAFKA-1615 URL: https://issues.apache.org/jira/browse/KAFKA-1615 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: Gwen Shapira Priority: Minor Labels: newbie, usability ZookeeperConsumerConnector generates group ID by taking the local hostname: consumerUuid = %s-%d-%s.format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) If localhost doesn't resolve (something that happens occasionally), this will fail with following error: Exception in thread main java.net.UnknownHostException: Billc-cent70x64: Billc-cent70x64: Name or service not known at java.net.InetAddress.getLocalHost(InetAddress.java:1473) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:119) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:142) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:149) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.net.UnknownHostException: Billc-cent70x64: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293) at java.net.InetAddress.getLocalHost(InetAddress.java:1469) ... 5 more Normally requiring a resolving localhost is not a problem, but here is seems a bit frivolous - its just for generating an ID, nothing network related. I think we can catch the exception and generate an ID without the hostname. This is low priority since the issue can be easily worked around (add the hostname to /etc/hosts) and since this API is going away anyway with the new consumer API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1968) SimpleConsumer succeeds with embedded failure message
[ https://issues.apache.org/jira/browse/KAFKA-1968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Keith Bloomfield updated KAFKA-1968: Description: I am using a modified version of the Kafka SimpleConsumer to expose offsets to a rest api, based on: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example However, the behavior of the example is inconsistent with the shell command: ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker ... In that sometimes it works, and sometimes it returns successfully with an embedded error. In other words, the check: if (response.hasError()) {...} passes because the response (as far as it's aware) has no error. But if I drop in a: response.toString().contains(error) then sure enough, there is an error in the response. The error is: kafka.common.UnknownException Not sure what is happening in Kafka to result in this behavior. was: I am using a modified version of the Kafka SimpleConsumer to expose offsets to a rest api, based on: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example However, the behavior of the example is inconsistent with the shell command: ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker ... In that sometimes it works, and sometimes it returns successfully with an embedded error. In other words, the check: if (response.hasError()) {...} is entirely useless because the response (as far as it's aware) has no error. But if I drop in a: response.toString().contains(error) then sure enough, there is an error in the response. The error is: kafka.common.UnknownException Not sure what is happening in Kafka to result in this behavior. SimpleConsumer succeeds with embedded failure message - Key: KAFKA-1968 URL: https://issues.apache.org/jira/browse/KAFKA-1968 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.1.1 Environment: javaapi on Java 6 Reporter: Keith Bloomfield I am using a modified version of the Kafka SimpleConsumer to expose offsets to a rest api, based on: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example However, the behavior of the example is inconsistent with the shell command: ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker ... In that sometimes it works, and sometimes it returns successfully with an embedded error. In other words, the check: if (response.hasError()) {...} passes because the response (as far as it's aware) has no error. But if I drop in a: response.toString().contains(error) then sure enough, there is an error in the response. The error is: kafka.common.UnknownException Not sure what is happening in Kafka to result in this behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1968) SimpleConsumer succeeds with embedded failure message
Keith Bloomfield created KAFKA-1968: --- Summary: SimpleConsumer succeeds with embedded failure message Key: KAFKA-1968 URL: https://issues.apache.org/jira/browse/KAFKA-1968 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.1.1 Environment: javaapi on Java 6 Reporter: Keith Bloomfield I am using a modified version of the Kafka SimpleConsumer to expose offsets to a rest api, based on: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example However, the behavior of the example is inconsistent with the shell command: ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker ... In that sometimes it works, and sometimes it returns successfully with an embedded error. In other words, the check: if (response.hasError()) {...} is entirely useless because the response (as far as it's aware) has no error. But if I drop in a: response.toString().contains(error) then sure enough, there is an error in the response. The error is: kafka.common.UnknownException Not sure what is happening in Kafka to result in this behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1416) Unify sendMessages/getMessages in unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328188#comment-14328188 ] Flutra Osmani edited comment on KAFKA-1416 at 2/19/15 10:09 PM: May I (or how do I) assign this task to myself? I'd like to take a stab at it. was (Author: futtre): May I (or how do I) assign this task to myself? Unify sendMessages/getMessages in unit tests Key: KAFKA-1416 URL: https://issues.apache.org/jira/browse/KAFKA-1416 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Multiple unit tests have its own internal function to send/get messages from the brokers. For example: sendMessages in ZookeeperConsumerConnectorTest produceMessage in UncleanLeaderElectionTest sendMessages in FetcherTest etc It is better to unify them in TestUtils. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1416) Unify sendMessages/getMessages in unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328188#comment-14328188 ] Flutra Osmani commented on KAFKA-1416: -- May I (or how do I) assign this task to myself? Unify sendMessages/getMessages in unit tests Key: KAFKA-1416 URL: https://issues.apache.org/jira/browse/KAFKA-1416 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Multiple unit tests have its own internal function to send/get messages from the brokers. For example: sendMessages in ZookeeperConsumerConnectorTest produceMessage in UncleanLeaderElectionTest sendMessages in FetcherTest etc It is better to unify them in TestUtils. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31199: Patch for KAFKA-1965
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31199/#review73253 --- Ship it! Other than the minor renaming comment, the patch LGTM. I can help check in the updated patch core/src/main/scala/kafka/utils/DelayedItem.scala https://reviews.apache.org/r/31199/#comment119532 We have been following a convention of appending the timeunit for variables that hold a timestamp value. So I'd suggest, rename- due - dueMs delay - delayMs - Neha Narkhede On Feb. 19, 2015, 5:51 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31199/ --- (Updated Feb. 19, 2015, 5:51 p.m.) Review request for kafka. Bugs: KAFKA-1965 https://issues.apache.org/jira/browse/KAFKA-1965 Repository: kafka Description --- leaner DelayedItem Diffs - core/src/main/scala/kafka/utils/DelayedItem.scala a4e0dabc858bc0081ba4fc0deea203bebd8bbf6b Diff: https://reviews.apache.org/r/31199/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Commented] (KAFKA-1887) controller error message on shutting the last broker
[ https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328572#comment-14328572 ] Neha Narkhede commented on KAFKA-1887: -- Agree this error is really annoying. It will be great to squeeze this in 0.8.2.1 [~sriharsha] This might help- {code} if (canShutdown) { Utils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) -if(kafkaHealthcheck != null) - Utils.swallow(kafkaHealthcheck.shutdown()) if(socketServer != null) Utils.swallow(socketServer.shutdown()) if(requestHandlerPool != null) @@ -329,6 +327,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg Utils.swallow(consumerCoordinator.shutdown()) if(kafkaController != null) Utils.swallow(kafkaController.shutdown()) +if(kafkaHealthcheck != null) + Utils.swallow(kafkaHealthcheck.shutdown()) if(zkClient != null) Utils.swallow(zkClient.close()) {code} controller error message on shutting the last broker Key: KAFKA-1887 URL: https://issues.apache.org/jira/browse/KAFKA-1887 Project: Kafka Issue Type: Bug Components: core Reporter: Jun Rao Assignee: Sriharsha Chintalapani Priority: Minor Fix For: 0.8.3 We always see the following error in state-change log on shutting down the last broker. [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change for partition [test,0] from OfflinePartition to OnlinePartition failed (state.change.logger) kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)] at kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120) at kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117) at kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1615) Generating group ID in ZookeeperConsumerConnector shouldn't require local hostname to resolve
[ https://issues.apache.org/jira/browse/KAFKA-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328553#comment-14328553 ] Neha Narkhede commented on KAFKA-1615: -- [~jrafalski] Seems like a workable approach. We have to keep either the hostname or the IP in the consumer uuid. Generating group ID in ZookeeperConsumerConnector shouldn't require local hostname to resolve - Key: KAFKA-1615 URL: https://issues.apache.org/jira/browse/KAFKA-1615 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Gwen Shapira Assignee: Gwen Shapira Priority: Minor Labels: newbie, usability ZookeeperConsumerConnector generates group ID by taking the local hostname: consumerUuid = %s-%d-%s.format( InetAddress.getLocalHost.getHostName, System.currentTimeMillis, uuid.getMostSignificantBits().toHexString.substring(0,8)) If localhost doesn't resolve (something that happens occasionally), this will fail with following error: Exception in thread main java.net.UnknownHostException: Billc-cent70x64: Billc-cent70x64: Name or service not known at java.net.InetAddress.getLocalHost(InetAddress.java:1473) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:119) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:142) at kafka.consumer.Consumer$.create(ConsumerConnector.scala:89) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:149) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.net.UnknownHostException: Billc-cent70x64: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$1.lookupAllHostAddr(InetAddress.java:901) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1293) at java.net.InetAddress.getLocalHost(InetAddress.java:1469) ... 5 more Normally requiring a resolving localhost is not a problem, but here is seems a bit frivolous - its just for generating an ID, nothing network related. I think we can catch the exception and generate an ID without the hostname. This is low priority since the issue can be easily worked around (add the hostname to /etc/hosts) and since this API is going away anyway with the new consumer API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1968) SimpleConsumer succeeds with embedded failure message
[ https://issues.apache.org/jira/browse/KAFKA-1968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1968. -- Resolution: Not a Problem This is easier to discuss on the mailing list first to determine if it is a bug or not. Feel free to reopen if we determine it is a problem through the mailing list discussion SimpleConsumer succeeds with embedded failure message - Key: KAFKA-1968 URL: https://issues.apache.org/jira/browse/KAFKA-1968 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.1.1 Environment: javaapi on Java 6 Reporter: Keith Bloomfield I am using a modified version of the Kafka SimpleConsumer to expose offsets to a rest api, based on: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example However, the behavior of the example is inconsistent with the shell command: ./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker ... In that sometimes it works, and sometimes it returns successfully with an embedded error. In other words, the check: if (response.hasError()) {...} passes because the response (as far as it's aware) has no error. But if I drop in a: response.toString().contains(error) then sure enough, there is an error in the response. The error is: kafka.common.UnknownException Not sure what is happening in Kafka to result in this behavior. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1969) NPE in unit test for new consumer
Neha Narkhede created KAFKA-1969: Summary: NPE in unit test for new consumer Key: KAFKA-1969 URL: https://issues.apache.org/jira/browse/KAFKA-1969 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede {code} kafka.api.ConsumerTest testConsumptionWithBrokerFailures FAILED java.lang.NullPointerException at org.apache.kafka.clients.consumer.KafkaConsumer.ensureCoordinatorReady(KafkaConsumer.java:1238) at org.apache.kafka.clients.consumer.KafkaConsumer.initiateCoordinatorRequest(KafkaConsumer.java:1189) at org.apache.kafka.clients.consumer.KafkaConsumer.commit(KafkaConsumer.java:777) at org.apache.kafka.clients.consumer.KafkaConsumer.commit(KafkaConsumer.java:816) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:704) at kafka.api.ConsumerTest.consumeWithBrokerFailures(ConsumerTest.scala:167) at kafka.api.ConsumerTest.testConsumptionWithBrokerFailures(ConsumerTest.scala:152) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1964) testPartitionReassignmentCallback hangs occasionally
[ https://issues.apache.org/jira/browse/KAFKA-1964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1964: - Component/s: admin testPartitionReassignmentCallback hangs occasionally - Key: KAFKA-1964 URL: https://issues.apache.org/jira/browse/KAFKA-1964 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.3 Reporter: Jun Rao Labels: newbie++ Attachments: stack.out -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1964) testPartitionReassignmentCallback hangs occasionally
[ https://issues.apache.org/jira/browse/KAFKA-1964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1964: - Labels: newbie++ (was: ) testPartitionReassignmentCallback hangs occasionally - Key: KAFKA-1964 URL: https://issues.apache.org/jira/browse/KAFKA-1964 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.3 Reporter: Jun Rao Labels: newbie++ Attachments: stack.out -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1962) Restore delayed request metrics
[ https://issues.apache.org/jira/browse/KAFKA-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1962: - Component/s: purgatory Restore delayed request metrics --- Key: KAFKA-1962 URL: https://issues.apache.org/jira/browse/KAFKA-1962 Project: Kafka Issue Type: Sub-task Components: purgatory Reporter: Joel Koshy Assignee: Joel Koshy It seems we have lost the delayed request metrics that we had before: Producer/Fetch(follower/consumer) expires-per-second -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1966) Cannot read just created index
[ https://issues.apache.org/jira/browse/KAFKA-1966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1966. -- Resolution: Won't Fix It is easier to discuss these types of issues on the mailing list and create a JIRA if it is determined to be something that should be fixed in kafka. Cannot read just created index -- Key: KAFKA-1966 URL: https://issues.apache.org/jira/browse/KAFKA-1966 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.0 Environment: Docker FROM dockerfile/ubuntu Oracle Java 1.7.0_72 Kafka Scala 2.11 Kafka 0.8.2.0 Reporter: Igor Artamonov Priority: Critical Created a fresh {{test}} topic by using {{kafka-console-producer.sh}}, but Kafka fails to process this topic: {code} WARN Failed to send producer request with correlation id 24 to broker 1 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.writev0(Native Method) at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51) at sun.nio.ch.IOUtil.write(IOUtil.java:148) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524) at java.nio.channels.SocketChannel.write(SocketChannel.java:493) at kafka.network.BoundedByteBufferSend.writeTo(BoundedByteBufferSend.scala:56) at kafka.network.Send$class.writeCompletely(Transmission.scala:75) at kafka.network.BoundedByteBufferSend.writeCompletely(BoundedByteBufferSend.scala:26) at kafka.network.BlockingChannel.send(BlockingChannel.scala:92) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:72) {code} Cannot receive it using {{kafka-console-receiver.sh}} too, getting infinite: {code} WARN [console-consumer-65093_a3607751fab8-1424346729085-3a3736f8-leader-finder-thread], Failed to add leader for partitions [test,0],[test,1]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) kafka.common.NotLeaderForPartitionException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:379) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160) {code} Even cannot read the index: {code} kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka-logs/test-0/.index --deep-iteration Dumping /kafka-logs/test-0/.index Exception in thread main java.io.IOException: Invalid argument at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:874) at kafka.log.OffsetIndex.init(OffsetIndex.scala:74) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1694) kafka command line and centralized operations
[ https://issues.apache.org/jira/browse/KAFKA-1694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327691#comment-14327691 ] Guozhang Wang commented on KAFKA-1694: -- Sounds great, thanks! kafka command line and centralized operations - Key: KAFKA-1694 URL: https://issues.apache.org/jira/browse/KAFKA-1694 Project: Kafka Issue Type: Bug Reporter: Joe Stein Assignee: Andrii Biletskyi Priority: Critical Fix For: 0.8.3 Attachments: KAFKA-1694.patch, KAFKA-1694_2014-12-24_21:21:51.patch, KAFKA-1694_2015-01-12_15:28:41.patch, KAFKA-1694_2015-01-12_18:54:48.patch, KAFKA-1694_2015-01-13_19:30:11.patch, KAFKA-1694_2015-01-14_15:42:12.patch, KAFKA-1694_2015-01-14_18:07:39.patch, KAFKA-1772_1802_1775_1774_v2.patch https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29301: Patch for KAFKA-1694
On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/TopicCommandHelper.scala, lines 1-17 https://reviews.apache.org/r/29301/diff/7/?file=821380#file821380line1 One general comment: For some topic commands, why use AdminUtils to write ZK path again instead of handle it via the controller directly? Or this is still WIP? Andrii Biletskyi wrote: Not sure I understand you. You mean technially calling ZK client from Controller class, not through TopicCommandHelper? If so - it's just to leave KafkaApi clean and small. Guozhang Wang wrote: For example, upon receiving a create-topic request, the helper class will call AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK() which will just write this request to ZK admin path for it to be captured by controller; however since only the broker with the active controller will receive such requests why don't we just hand off the request from KafkaApi to the controller to handle it. One question, though, is that we need to make sure concurrency is correct for controller handling multiple such tasks, and we have some thoughts about how to deal with such cases (see Jiangjie and my commnets in KAFKA-1305). Andrii Biletskyi wrote: Thanks for explanation. So instead of current workflow: CreateTopicRequest - Helper class - AdminUtils - zk path is created - Controller's changeTopicListener picks up the change - topic is created You propose: CreateTopicRequest - Controller directly executes logic from ChangeTopicListener ? Very interesting idea! Can we make a separate ticket for that? I tried to port TopicCommand as is in order to have at least for now working end-to-end infrastructure to handle Admin commands. I believe this is more like refactoring TopicCommand (probably delete- and alterTopic should be changed too). I'm a bit concerned adding this refactoring will require additional efforts to test (especially taking into account your note about KAFKA-1305) and time to agree on approach we will use to address this issue. Agree. On Feb. 3, 2015, 7:14 p.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/common/requests/admin/AbstractAdminRequest.java, lines 1-28 https://reviews.apache.org/r/29301/diff/7/?file=821321#file821321line1 Wondering if an abstract admin request is necessary, as it does not have many common interface functions. Andrii Biletskyi wrote: This is needed to avoid code dupliaction in admin clients. See RequestDispatcher for example. You will need to call admin request and get response of that type. Having AbstractAdminRequest (specifically createResponseCounterpart) lets you have: ``` public T extends AbstractAdminResponse T sendAdminRequest(AbstractAdminRequestT abstractRequest) throws Exception { ``` Instead of sendCreateTopicRequest, sendAlter... etc. If there is a better and cleaner way to achive this - please let me know. Guozhang Wang wrote: I see. How about changing sendAdminRequest(AbstractAdminRequestT) to sendRequest(ClientRequest) and the caller like AlterTopicCommand.execute() will be: AlterTopicRequest alterTopicRequest = // create the request ClientRequest request = new ClientRequest(new RequestSend(...) ...) dispatcher.sendRequest(request) This way we are duplicating the second line here in every upper-level class, while saving the admin interface. I actually do not know which one is better.. Andrii Biletskyi wrote: Yes, but you will also need typed response. Let me continue your example: AlterTopicRequest alterTopicRequest = // create the request ClientRequest request = new ClientRequest(new RequestSend(...) ...) __ClientResponse response = dispatcher.sendRequest(request, ApiKeys.ALTER_TOPIC)__ __AlterTopicResponse alterTopicResponse = new AlterTopicResponse(response.responseBody())__ alterTopicResponse.// now get what you need from typed response And you will have this NetworkClient related Stuff (RequestSend, ClientRequest ...) everywhere in you client code. But it looks pretty strange you can't have generic method to send request and get immidiately response of the required type. So really RequestDispatcher allready has sendRequest() as you suggest, with sendAdminRequest I tried to address issue with getting response counterpart. But I agree that solution might mislead people, so if doesn't worth it - I'm okay to remove intermediate AbstractAdminRequest/Response. Makes sense, I am now OK with the admin request interface. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29301/#review70790 --- On Jan. 14, 2015,
Re: Scala IDE debugging Unit Test Issues
Wiki link on Eclipse setup: https://cwiki.apache.org/confluence/display/KAFKA/Eclipse-Scala-Gradle-Git+Developement+Environment+Setup On Thu, Feb 19, 2015 at 10:03 PM, Jonathan Rafalski jonathan.rafal...@gmail.com wrote: Hello again, Sorry again to send you guys such a generic error. Seems eclispe did not want to give me any good error messages. I switched over to intellij and was able to get everything up and running after resolving two blockers: 1) under SettingsBuild, Execution, DeploymentScala Compiler under the core project the additional compiler options: had the -target set to jvm-1.8 which seems is not supported by scala 2.11. removing that option and running under JDK 1.7 got me past there. 2) under Project StructureProject SettingsModules the Kafka module's compile output had the same path for both output and test output which was preventing the compiler. I will go back with the Eclispe/Scala IDE setup and see if these two errors were also preventing there and in the end will create a write up on my adventures for review. Sorry again about the total newbieness of my prior email. I will work harder on digging deeper before my next query to the list. Jonathan. On Tue, Feb 17, 2015 at 10:09 PM, Jonathan Rafalski jonathan.rafal...@gmail.com wrote: Hello all, Completely new to kafka and scala but thought I would get my feet wet with a few of the newbie tasks. I was able to get the source up and running in the Scala IDE and I am able to debug the examples, however when I try to debug any of the unit tests in core (for example the unit.kafka.consumer.zookeeperconsumerconnectortest class) I get the java.lang.ClassNotFoundException: Class not found unit.kafka.consumer.ZookeeperConsumerConnectorTest I have searched the normal sites (SE and Mail archives) and attempted a few solutions (adding physical directories of the .class and .scala files to the build path adding junit libraries) but to no avail. My thoughts are this is due to the fact that the package declaration on the unit tests point to the main pacakages not the unit test package which is causing eclipse to freak out (though might be way off base). also since I am just starting and I have no alliances yet is eclipse the preferred IDE here or should I be going with Intellij? I apologize for the complete newb question here but any help on setup to get these unit tests up and running so I can start contributing I would be grateful. Thank you again. Jonathan.
[jira] [Updated] (KAFKA-1965) Leaner DelayedItem
[ https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-1965: Attachment: KAFKA-1965.patch Leaner DelayedItem -- Key: KAFKA-1965 URL: https://issues.apache.org/jira/browse/KAFKA-1965 Project: Kafka Issue Type: Improvement Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Joel Koshy Priority: Trivial Attachments: KAFKA-1965.patch, KAFKA-1965.patch In DelayedItem, which is a superclass of DelayedOperation, both the creation timestamp and the length delay are stored. However, all it needs is one timestamp that is the due of the item. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1965) Leaner DelayedItem
[ https://issues.apache.org/jira/browse/KAFKA-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14327795#comment-14327795 ] Yasuhiro Matsuda commented on KAFKA-1965: - Created reviewboard https://reviews.apache.org/r/31199/diff/ against branch origin/trunk Leaner DelayedItem -- Key: KAFKA-1965 URL: https://issues.apache.org/jira/browse/KAFKA-1965 Project: Kafka Issue Type: Improvement Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Joel Koshy Priority: Trivial Attachments: KAFKA-1965.patch, KAFKA-1965.patch In DelayedItem, which is a superclass of DelayedOperation, both the creation timestamp and the length delay are stored. However, all it needs is one timestamp that is the due of the item. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328318#comment-14328318 ] Neha Narkhede commented on KAFKA-1546: -- bq. For example a replica could be catching up quickly but the replica.lag.max.ms counter would still increase until it fully catches up and then it will abruptly drop to zero. What we want to measure is not *exactly* how slow it is but express lag in terms of maximum time spent not catching up with the leader. The check that Jay mentions can be improved a little. Basically, if the replica is at the log end offset, then we don't want to check for lagBegin at all. The first time a replica starts lagging, you set the lagBegin to current time. From there on, you only reset it to -1 if it reaches log end offset. This will remove a replica that keeps fetching but is unable to catch up with the leader for replica.lag.max.ms. So the check is more like- {code} if(!fetchedData.readToEndOfLog) { if(lagBegin == -1) { this.lagBegin = System.currentTimeMillis() } } else { this.lagBegin = -1 } {code} Then the liveness criteria is partitionLagging = this.lagBegin 0 System.currentTimeMillis() - this.lagBegin REPLICA_LAG_TIME_MS In order to do this, LogReadResult might have to return the log end offset as well. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328260#comment-14328260 ] Aditya Auradkar commented on KAFKA-1546: I do have a concern about the heuristic. [~jkreps] Using your example: if(!fetchedData.readToEndOfLog) this.lagBegin = System.currentTimeMillis() else this.lagBegin = -1 Then the liveness criteria is partitionLagging = this.lagBegin 0 System.currentTimeMillis() - this.lagBegin REPLICA_LAG_TIME_MS The time counter starts when the read doesn't go the end of log and only stops when it does reach the end. In this case, the lag measures the absolute duration of time for which this replica is lagging but not how far behind it is in terms of applying commits. For example a replica could be catching up quickly but the replica.lag.max.ms counter would still increase until it fully catches up and then it will abruptly drop to zero. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328413#comment-14328413 ] Jiangjie Qin commented on KAFKA-1546: - I'm not sure if my concern is valid. If we have many producers producing messages to a partition, it's possible that after we fulfill a fetch request from replica fetcher but before we check if the log is caught up to log end or not, some new messages are appended. In this case, we will never be able to really caught up to log end. Maybe I understood it wrong, but I think what [~nehanarkhede] proposed before seems work, which is 1. Have a time criteria, a fetch request must be received from the follower in 10 secs. 2. Instead of a fixed number of max message lag, say 4000, use the number of (message-in-rate * maxLagMs) as the max message lag threshold. This way we can handle both busy topics and low-volume topics. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328427#comment-14328427 ] Aditya Auradkar commented on KAFKA-1546: I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of will take more than N ms to catch up., I also agree it is tricky to implement correctly. One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset? If this isn't feasible, then I do think that the originally proposed heuristic is a good one.. and I will submit a patch for it. Also, there are currently 2 checks for replica lag (in ISR). a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1729) add doc for Kafka-based offset management in 0.8.2
[ https://issues.apache.org/jira/browse/KAFKA-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328493#comment-14328493 ] Joel Koshy commented on KAFKA-1729: --- Need to also update the protocol guide wiki with later versions of OCR/OFR add doc for Kafka-based offset management in 0.8.2 -- Key: KAFKA-1729 URL: https://issues.apache.org/jira/browse/KAFKA-1729 Project: Kafka Issue Type: Sub-task Reporter: Jun Rao Assignee: Joel Koshy Fix For: 0.8.2.0 Attachments: KAFKA-1729.patch, KAFKA-1729.patch, KAFKA-1729_2015-02-18_17:30:37.patch, KAFKA-1782-doc-v1.patch, KAFKA-1782-doc-v2.patch, KAFKA-1782-doc-v3.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1952) High CPU Usage in 0.8.2 release
[ https://issues.apache.org/jira/browse/KAFKA-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328306#comment-14328306 ] Puneet Mehta commented on KAFKA-1952: - Hi Jun, Can this patch be applied against kafka 8.2 source? I am not sure since kafka 8.2 source doesn't have any file matching this path i.e '/core/src/main/scala/kafka/server/DelayedOperation.scala' Thanks, Puneet Mehta High CPU Usage in 0.8.2 release --- Key: KAFKA-1952 URL: https://issues.apache.org/jira/browse/KAFKA-1952 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Jay Kreps Assignee: Jun Rao Priority: Critical Fix For: 0.8.2.1 Attachments: kafka-1952.patch, kafka-1952.patch, kafka-1952_2015-02-15_15:26:33.patch Brokers with high partition count see increased CPU usage when migrating from 0.8.1.1 to 0.8.2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1952) High CPU Usage in 0.8.2 release
[ https://issues.apache.org/jira/browse/KAFKA-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328371#comment-14328371 ] Jun Rao commented on KAFKA-1952: There is a separate patch for 0.8.2 and is always committed to the 0.8.2 branch. High CPU Usage in 0.8.2 release --- Key: KAFKA-1952 URL: https://issues.apache.org/jira/browse/KAFKA-1952 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Jay Kreps Assignee: Jun Rao Priority: Critical Fix For: 0.8.2.1 Attachments: kafka-1952.patch, kafka-1952.patch, kafka-1952_2015-02-15_15:26:33.patch Brokers with high partition count see increased CPU usage when migrating from 0.8.1.1 to 0.8.2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328472#comment-14328472 ] Neha Narkhede commented on KAFKA-1546: -- bq. If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it. Sounds good. Will help you review it. bq. a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. Correct. bq. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well. Hmm, depends. There are 2 things we need to check - dead replicas and slow replicas. The dead replica check is to remove a replica that hasn't sent a fetch request to the leader for some time. Take the example of a replica that is in sync with the leader (lagBegin is -1), there aren't new messages coming in and it stops fetching entirely. We can remove the replica when there are new messages based on the lagBegin logic but really that replica should've been removed long before that, because it stopped fetching and was dead. The logic we have above works pretty well for slow replicas, but I think we still need to handle dead replicas for low-volume topics. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328467#comment-14328467 ] Joel Koshy commented on KAFKA-1546: --- No, we don't have any timestamp currently associated with messages/offsets. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328427#comment-14328427 ] Aditya Auradkar edited comment on KAFKA-1546 at 2/20/15 1:30 AM: - I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of will take more than N ms to catch up., I also agree it is tricky to implement correctly. One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset? If this isn't feasible, then I do think that the heuristic proposed in Neha's comment is a good one.. and I will submit a patch for it. Also, there are currently 2 checks for replica lag (in ISR). a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well. was (Author: aauradkar): I agree we should model this in terms of time and not in terms of messages. While I think it is a bit more natural to model replication lag in terms of will take more than N ms to catch up., I also agree it is tricky to implement correctly. One possible way to model it is to associate an offset with a commit timestamp at the source. For example, assume that a message with offset O is produced on the leader for partition X at timestamp T1. If the time now is T2 and a replica's log end offset is O (i.e. it is has consumed till O), then the lag can be (T2-T1). Is there any easy way to obtain the source timestamp given an offset? If this isn't feasible, then I do think that the originally proposed heuristic is a good one.. and I will submit a patch for it. Also, there are currently 2 checks for replica lag (in ISR). a. keepInSyncMessages - This tracks replica lag as a function of the number of messages it is trailing behind. I believe we will remove this entirely regardless of the approach we choose. b. keepInSyncTimeMs - This tracks the amount of time between fetch requests. I think we can remove this as well. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328298#comment-14328298 ] Jay Kreps commented on KAFKA-1546: -- Yeah that is true. I think we are in agreement that we want to express this in terms of time not # messages. The criteria I was proposing is not caught up for N ms where the definition of not caught up is reads to the end of the log. I think what you are proposing is will take more than N ms to catch up. Originally I had thought a little about this. However this criteria is a lot harder to calculate. In order to predict the time to catch up you need to estimate the rate at which messages will be read in the future (e.g. if I am 1000 messages behind and reading at 500 msg/sec then it will take 2 seconds). I was concerned that any estimate would be really fragile since the whole point of a failure is that it changes this kind of rate in some way (because a replica is slow, or messages got bigger, or whatever) so predictions off past rates may be wrong once the (possibly) soft failure happens. I think the motivation for the criteria I was proposing was that any caught up reader should always be at the end of the log (that is the definition of caught up) and if you go for a period of time without being at the end then likely you won't get to the end soon. You could imagine some situation in which somehow the follower was able to exactly keep up but was always one message behind the end of the log in which case we would falsely failure detect the follower. However I think this would be unlikely and failure detecting is actually probably okay since you are exactly on the verge of overwhelmed (one byte per second more throughput and you will be dead). Let me know if you think that makes sense, I could definitely be convinced it would be better a different way. Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1952) High CPU Usage in 0.8.2 release
[ https://issues.apache.org/jira/browse/KAFKA-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14328383#comment-14328383 ] Puneet Mehta commented on KAFKA-1952: - Thanks for the clarification, I kind of figured it out from the kafka git repo. High CPU Usage in 0.8.2 release --- Key: KAFKA-1952 URL: https://issues.apache.org/jira/browse/KAFKA-1952 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Jay Kreps Assignee: Jun Rao Priority: Critical Fix For: 0.8.2.1 Attachments: kafka-1952.patch, kafka-1952.patch, kafka-1952_2015-02-15_15:26:33.patch Brokers with high partition count see increased CPU usage when migrating from 0.8.1.1 to 0.8.2. -- This message was sent by Atlassian JIRA (v6.3.4#6332)