[jira] [Created] (KAFKA-6797) Connect oracle database to kafka and stream data
Ujjwal Kumar created KAFKA-6797: --- Summary: Connect oracle database to kafka and stream data Key: KAFKA-6797 URL: https://issues.apache.org/jira/browse/KAFKA-6797 Project: Kafka Issue Type: Task Components: KafkaConnect Affects Versions: 1.0.1 Environment: Windows OS Reporter: Ujjwal Kumar Hi, Please help me with the steps to connect kafka with oracle db or any other db. I tried searching online but couldn't find anything. My requirement is simple, Kafka should be able to get the data from a oracle database and we can consume that elsewhere. I have windows setup -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Widman updated KAFKA-6266: --- Affects Version/s: 1.0.1 > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0, 1.0.1 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar >Priority: Major > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so > that they wont repeat. Can someone please help me in fixing the below > warnings. > {code} > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440212#comment-16440212 ] Jeff Widman edited comment on KAFKA-6266 at 4/17/18 1:07 AM: - I am hitting this after upgrading a 0.10.0.1 broker to 1.0.1. the __consumer_offsets topic has the following config: {code} Topic:__consumer_offsetsPartitionCount:157 ReplicationFactor:2 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer {code} Ignore the non-standard partition count, that is a byproduct of a bug from years ago. In this case, I think the only effect is that it makes it less likely that a partition within the __consumer_offsets topic gets produced to, which it sounds like would clear this error. As described above, the external symptoms were a zero-byte log file that has a name like 00012526.log. Since this particular cluster has significantly more partitions in __consumer_offsets than it does consumer groups, it will not clear the error anytime soon because no consumer groups offsets are being hashed onto the problem partitions. So to get out of the situation, I shutdown all brokers that have replicas of the partition, then deleted the logfiles for that partition, then restarted the brokers. This cleared the filename so that it matched the zero-byte contents. Note that doing this in production will require downtime as you are taking a partition in the __consumer_offsets topic completely offline. On the flip side, you are only likely to hit this on somewhat underloaded clusters that can likely afford downtime... typically busy production clusters will clear themselves automatically through consumer groups producing to this partition. was (Author: jeffwidman): I am hitting this after upgrading a 0.10.0.1 broker to 1.0.1. the __consumer_offsets topic has the following config: {code} Topic:__consumer_offsetsPartitionCount:157 ReplicationFactor:2 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer {code} Ignore the non-standard partition count, that is a byproduct of a bug from years ago. In this case, I think the only effect is that it makes it less likely that a partition within the __consumer_offsets topic gets produced to, which it sounds like would clear this error. > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar >Priority: Major > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so > that they wont repeat. Can someone please help me in fixing the below > warnings. > {code} > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440212#comment-16440212 ] Jeff Widman commented on KAFKA-6266: I am hitting this after upgrading a 0.10.0.1 broker to 1.0.1. the __consumer_offsets topic has the following config: {code} Topic:__consumer_offsetsPartitionCount:157 ReplicationFactor:2 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer {code} Ignore the non-standard partition count, that is a byproduct of a bug from years ago. In this case, I think the only effect is that it makes it less likely that a partition within the __consumer_offsets topic gets produced to, which it sounds like would clear this error. > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar >Priority: Major > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so > that they wont repeat. Can someone please help me in fixing the below > warnings. > {code} > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid
[ https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Widman updated KAFKA-6266: --- Description: I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below warnings in the log. I'm seeing these continuously in the log, and want these to be fixed- so that they wont repeat. Can someone please help me in fixing the below warnings. {code} WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 3346 since the checkpointed offset 3332 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 203569 since the checkpointed offset 120955 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$) {code} was: I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below warnings in the log. I'm seeing these continuously in the log, and want these to be fixed- so that they wont repeat. Can someone please help me in fixing the below warnings. WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 3346 since the checkpointed offset 3332 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 203569 since the checkpointed offset 120955 is invalid. (kafka.log.LogCleanerManager$) WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$) > Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of > __consumer_offsets-xx to log start offset 203569 since the checkpointed > offset 120955 is invalid. (kafka.log.LogCleanerManager$) > -- > > Key: KAFKA-6266 > URL: https://issues.apache.org/jira/browse/KAFKA-6266 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: CentOS 7, Apache kafka_2.12-1.0.0 >Reporter: VinayKumar >Priority: Major > > I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below > warnings in the log. > I'm seeing these continuously in the log, and want these to be fixed- so > that they wont repeat. Can someone please help me in fixing the below > warnings. > {code} > WARN Resetting first dirty offset of __consumer_offsets-17 to log start > offset 3346 since the checkpointed offset 3332 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-23 to log start > offset 4 since the checkpointed offset 1 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-19 to log start > offset 203569 since the checkpointed offset 120955 is invalid. > (kafka.log.LogCleanerManager$) > WARN Resetting first dirty offset of __consumer_offsets-35 to log start > offset 16957 since the checkpointed offset 7 is invalid. > (kafka.log.LogCleanerManager$) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6650) The controller should be able to handle a partially deleted topic
[ https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-6650. Resolution: Fixed Fix Version/s: 2.0.0 merged the PR to trunk. > The controller should be able to handle a partially deleted topic > - > > Key: KAFKA-6650 > URL: https://issues.apache.org/jira/browse/KAFKA-6650 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.0.0 > > > A previous controller could have deleted some partitions of a topic from ZK, > but not all partitions, and then died. > In that case, the new controller should be able to handle the partially > deleted topic, and finish the deletion. > In the current code base, if there is no leadership info for a replica's > partition, the transition to OfflineReplica state for the replica will fail. > Afterwards the transition to ReplicaDeletionStarted will fail as well since > the only valid previous state for ReplicaDeletionStarted is OfflineReplica. > Furthermore, it means the topic deletion will never finish. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6650) The controller should be able to handle a partially deleted topic
[ https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440195#comment-16440195 ] ASF GitHub Bot commented on KAFKA-6650: --- junrao closed pull request #4825: KAFKA-6650: Allowing transition to OfflineReplica state for replicas without leadership info URL: https://github.com/apache/kafka/pull/4825 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index a2d04e65ae6..5fafcc4fe3f 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig, controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition, deletePartition = false, (_, _) => ()) } -val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)) -val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition)) +val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition { replica => + controllerContext.partitionLeadershipInfo.contains(replica.topicPartition) +} +val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition)) updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) => if (!topicDeletionManager.isPartitionToBeDeleted(partition)) { val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId) @@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig, logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica) replicaState.put(replica, OfflineReplica) } + +replicasWithoutLeadershipInfo.foreach { replica => + logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), OfflineReplica) + replicaState.put(replica, OfflineReplica) +} case ReplicaDeletionStarted => validReplicas.foreach { replica => logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionStarted) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 9b58fc7cc4b..a65128ad98a 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean * @return true if path gets deleted successfully, false if root path doesn't exist * @throws KeeperException if there is an error while deleting the znodes */ - private[zk] def deleteRecursive(path: String): Boolean = { + def deleteRecursive(path: String): Boolean = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path)) getChildrenResponse.resultCode match { case Code.OK => diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index ef455d4457e..4c033c421bb 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -17,7 +17,7 @@ package kafka.admin import kafka.log.Log -import kafka.zk.ZooKeeperTestHarness +import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness} import kafka.utils.TestUtils import kafka.server.{KafkaConfig, KafkaServer} import org.junit.Assert._ @@ -326,7 +326,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness { brokerConfigs.head.setProperty("log.segment.bytes","100") brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577") -servers = createTestTopicAndCluster(topic,brokerConfigs) +servers = createTestTopicAndCluster(topic, brokerConfigs, expectedReplicaAssignment) // for simplicity, we are validating cleaner offsets on a single broker val server = servers.head @@ -363,18 +363,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness { TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers) } - private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true): Seq[KafkaServer] = { + private def createTestTopic
[jira] [Commented] (KAFKA-6796) Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas
[ https://issues.apache.org/jira/browse/KAFKA-6796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440133#comment-16440133 ] ASF GitHub Bot commented on KAFKA-6796: --- hachikuji opened a new pull request #4883: KAFKA-6796; Fix surprising UNKNOWN_TOPIC error from requests to non-replicas URL: https://github.com/apache/kafka/pull/4883 Currently if the client sends a produce request or a fetch request to a broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a bit surprising to see when the topic actually exists. It would be better to return NOT_LEADER to avoid confusion. Clients typically handle both errors by refreshing metadata and retrying, so changing this should not cause any change in behavior on the client. This case can be hit following a partition reassignment after the leader is moved and the local replica is deleted. To validate the current behavior and the fix, I've added integration tests for the fetch and produce APIs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas > - > > Key: KAFKA-6796 > URL: https://issues.apache.org/jira/browse/KAFKA-6796 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > Currently if the client sends a produce request or a fetch request to a > broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a > bit surprising to see when the topic actually exists. It would be better to > return NOT_LEADER to avoid confusion. Clients typically handle both errors by > refreshing metadata and retrying, so changing this should not cause any > change in behavior on the client. This case can be hit following a partition > reassignment after the leader is moved and the local replica is deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6796) Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas
Jason Gustafson created KAFKA-6796: -- Summary: Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas Key: KAFKA-6796 URL: https://issues.apache.org/jira/browse/KAFKA-6796 Project: Kafka Issue Type: Bug Affects Versions: 1.0.1, 1.1.0 Reporter: Jason Gustafson Assignee: Jason Gustafson Currently if the client sends a produce request or a fetch request to a broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a bit surprising to see when the topic actually exists. It would be better to return NOT_LEADER to avoid confusion. Clients typically handle both errors by refreshing metadata and retrying, so changing this should not cause any change in behavior on the client. This case can be hit following a partition reassignment after the leader is moved and the local replica is deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440047#comment-16440047 ] Srinivas Dhruvakumar edited comment on KAFKA-6649 at 4/16/18 9:27 PM: -- The Offset Out Of Range Exception has been fixed but we hit the below when we were in the middle of our release. We got an exception while releasing the latest patch on one of the clusters. Is this expected ? 2018-04-13 00:14:18,344 [ReplicaFetcherThread-2-348] ERROR (kafka.server.ReplicaFetcherThread) - [ReplicaFetcherThread-2-348]: Error due to kafka.common.KafkaException: error processing data for partition [topic1-0,0] offset 325231 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:208) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: java.lang.IllegalArgumentException: High watermark offset should be non-negative at kafka.cluster.Replica.highWatermark_$eq(Replica.scala:144) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:109) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:186) ... 13 more was (Author: srinivas.d...@gmail.com): The Offset Out Of Range Exception has been fixed but we hit the below when we were in the middle of our release. We got an exception while releasing the latest patch on one of the clusters. 2018-04-13 00:14:18,344 [ReplicaFetcherThread-2-348] ERROR (kafka.server.ReplicaFetcherThread) - [ReplicaFetcherThread-2-348]: Error due to kafka.common.KafkaException: error processing data for partition [topic1-0,0] offset 325231 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:208) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: java.lang.IllegalArgumentException: High watermark offset should be non-negative at kafka.cluster.Replica.highWatermark_$eq(Replica.scala:144) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:109) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFet
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440047#comment-16440047 ] Srinivas Dhruvakumar commented on KAFKA-6649: - The Offset Out Of Range Exception has been fixed but we hit the below when we were in the middle of our release. We got an exception while releasing the latest patch on one of the clusters. 2018-04-13 00:14:18,344 [ReplicaFetcherThread-2-348] ERROR (kafka.server.ReplicaFetcherThread) - [ReplicaFetcherThread-2-348]: Error due to kafka.common.KafkaException: error processing data for partition [topic1-0,0] offset 325231 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:208) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: java.lang.IllegalArgumentException: High watermark offset should be non-negative at kafka.cluster.Replica.highWatermark_$eq(Replica.scala:144) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:109) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:186) ... 13 more > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2
[jira] [Issue Comment Deleted] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Srinivas Dhruvakumar updated KAFKA-6649: Comment: was deleted (was: [~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can confirm that the fix works and is no longer an issue. This bug is fixed as part of -KAFKA-3978-. patch) > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread
[ https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439991#comment-16439991 ] Dong Lin commented on KAFKA-6795: - Thanks for adding more tests [~apovzner]. Currently ReplicaAlterLogDirsThread is indirectly tested in AdminClientIntegrationTest.testAlterReplicaLogDirs(). Not sure if this test does what you are looking for. > Add unit test for ReplicaAlterLogDirsThread > --- > > Key: KAFKA-6795 > URL: https://issues.apache.org/jira/browse/KAFKA-6795 > Project: Kafka > Issue Type: Improvement >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Major > > ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but > there is no unit test. > [~lindong] I assigned this to myself, since ideally I wanted to add unit > tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread
Anna Povzner created KAFKA-6795: --- Summary: Add unit test for ReplicaAlterLogDirsThread Key: KAFKA-6795 URL: https://issues.apache.org/jira/browse/KAFKA-6795 Project: Kafka Issue Type: Improvement Reporter: Anna Povzner Assignee: Anna Povzner ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but there is no unit test. [~lindong] I assigned this to myself, since ideally I wanted to add unit tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower
[ https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439970#comment-16439970 ] ASF GitHub Bot commented on KAFKA-6361: --- apovzner opened a new pull request #4882: KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over URL: https://github.com/apache/kafka/pull/4882 WIP - will add few more unit tests. Implementation of KIP-279 as described here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over In summary: - Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE - Leader replies with the pair( largest epoch less than or equal to the requested epoch, the end offset of this epoch) - If Follower does not know about the leader epoch that leader replies with, it truncates to the end offset of largest leader epoch less than leader epoch that leader replied with, and sends another OffsetForLeaderEpoch request. That request contains the largest leader epoch less than leader epoch that leader replied with. Added integration test EpochDrivenReplicationProtocolAcceptanceTest.logsShouldNotDivergeOnUncleanLeaderElections that does 3 fast leader changes where unclean leader election is enabled and min isr is 1. The test failed before the fix was implemented. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fast leader fail over can lead to log divergence between leader and follower > > > Key: KAFKA-6361 > URL: https://issues.apache.org/jira/browse/KAFKA-6361 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Anna Povzner >Priority: Major > Labels: reliability > > We have observed an edge case in the replication failover logic which can > cause a replica to permanently fall out of sync with the leader or, in the > worst case, actually have localized divergence between logs. This occurs in > spite of the improved truncation logic from KIP-101. > Suppose we have brokers A and B. Initially A is the leader in epoch 1. It > appends two batches: one in the range (0, 10) and the other in the range (11, > 20). The first one successfully replicates to B, but the second one does not. > In other words, the logs on the brokers look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > {code} > Broker A then has a zk session expiration and broker B is elected with epoch > 2. It appends a new batch with offsets (11, n) to its local log. So we now > have this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Normally we expect broker A to truncate to offset 11 on becoming the > follower, but before it is able to do so, broker B has its own zk session > expiration and broker A again becomes leader, now with epoch 3. It then > appends a new entry in the range (21, 30). The updated logs look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > 2: offsets: [21, 30], leader epoch: 3 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Now what happens next depends on the last offset of the batch appended in > epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch > request to broker A with epoch 2. Broker A will respond that epoch 2 ends at > offset 21. There are three cases: > 1) n < 20: In this case, broker B will not do any truncation. It will begin > fetching from offset n, which will ultimately cause an out of order offset > error because broker A will return the full batch beginning from offset 11 > which broker B will be unable to append. > 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 > and everything will appear fine though the logs have actually diverged. > 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in > the middle of the batch, it will truncate all the way to offset 10. It can > begin fetching from offset 11 and
[jira] [Comment Edited] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439951#comment-16439951 ] Srinivas Dhruvakumar edited comment on KAFKA-6649 at 4/16/18 7:51 PM: -- [~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can confirm that the fix works and is no longer an issue. This bug is fixed as part of -KAFKA-3978-. patch was (Author: srinivas.d...@gmail.com): [~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can confirm that the fix works and is no longer an issue. This bug is fixed as part of KAFKA-3978. > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439951#comment-16439951 ] Srinivas Dhruvakumar commented on KAFKA-6649: - [~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can confirm that the fix works and is no longer an issue. This bug is fixed as part of KAFKA-3978. > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4327. -- Resolution: Won't Fix I'm resolving this ticket as won't fix for now. > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439943#comment-16439943 ] Guozhang Wang commented on KAFKA-4327: -- As we discussed before, it may not worth the effort since 1) most users are likely going to call the `streams reset` shell script in which they are likely going to having a core package installed anyways, 2) thinking about it twice, leaving the reset tool in `kafka.tools` is okay since it is where we put all the admin tooling classes; personally I think we could also consider `o.a.k.tools`, but again that is not very significant either. > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric
[ https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6514. Resolution: Fixed Fix Version/s: 1.2.0 > Add API version as a tag for the RequestsPerSec metric > -- > > Key: KAFKA-6514 > URL: https://issues.apache.org/jira/browse/KAFKA-6514 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Allen Wang >Priority: Major > Fix For: 1.2.0 > > > After we upgrade broker to a new version, one important insight is to see how > many clients have been upgraded so that we can switch the message format when > most of the clients have also been updated to the new version to minimize the > performance penalty. > RequestsPerSec with the version tag will give us that insight. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6727) org.apache.kafka.clients.admin.Config has broken equals and hashCode method.
[ https://issues.apache.org/jira/browse/KAFKA-6727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439922#comment-16439922 ] ASF GitHub Bot commented on KAFKA-6727: --- hachikuji closed pull request #4796: KAFKA-6727 fix broken Config hashCode() and equals() URL: https://github.com/apache/kafka/pull/4796 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java index 56f9c270792..c81c0b60274 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java @@ -21,39 +21,40 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; /** * A configuration object containing the configuration entries for a resource. - * + * * The API of this class is evolving, see {@link AdminClient} for details. */ @InterfaceStability.Evolving public class Config { -private final Collection entries; +private final Map entries = new HashMap<>(); /** * Create a configuration instance with the provided entries. */ public Config(Collection entries) { -this.entries = Collections.unmodifiableCollection(entries); +for (ConfigEntry entry : entries) { +this.entries.put(entry.name(), entry); +} } /** * Configuration entries for a resource. */ public Collection entries() { -return entries; +return Collections.unmodifiableCollection(entries.values()); } /** * Get the configuration entry with the provided name or null if there isn't one. */ public ConfigEntry get(String name) { -for (ConfigEntry entry : entries) -if (entry.name().equals(name)) -return entry; -return null; +return entries.get(name); } @Override @@ -75,6 +76,6 @@ public int hashCode() { @Override public String toString() { -return "Config(entries=" + entries + ")"; +return "Config(entries=" + entries.values() + ")"; } } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java new file mode 100644 index 000..45b174bec19 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +public class ConfigTest { +private static final ConfigEntry E1 = new ConfigEntry("a", "b"); +private static final ConfigEntry E2 = new ConfigEntry("c", "d"); +private Config config; + +@Before +public void setUp() { +final Collection entries = new ArrayList<>(); +entries.add(E1); +entries.add(E2); + +config = new Config(entries); +} + +@Test +public void shouldGetEntry() { +assertThat(config.get("a"), is(E1)); +assertThat(config.get("c"), is(E2)); +} + +@Test +public void shouldReturnNullOnGetUnknownEntry() { +assertThat(config.get("unknown"), is(nullValue())); +} + +@Test +public void shouldGetAllEntries() { +assertThat(config.entries().size(), is(2)); +assertThat(config.entries(), hasItem
[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams
[ https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439821#comment-16439821 ] Jorge Quilcate commented on KAFKA-4327: --- Now that [https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API] is merged, does it make sense to reopen this issue, to move `StreamsResetter` to `streams` module? or should we just keep it in `core` module and close it? > Move Reset Tool from core to streams > > > Key: KAFKA-4327 > URL: https://issues.apache.org/jira/browse/KAFKA-4327 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Minor > > This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008 > Currently, Kafka Streams Application Reset Tool is part of {{core}} module > due to ZK dependency. After KIP-4 got merged, this dependency can be dropped > and the Reset Tool can be moved to {{streams}} module. > This should also update {{InternalTopicManager#filterExistingTopics}} that > revers to ResetTool in an exception message: > {{"Use 'kafka.tools.StreamsResetter' tool"}} > -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}} > Doing this JIRA also requires to update the docs with regard to broker > backward compatibility -- not all broker support "topic delete request" and > thus, the reset tool will not be backward compatible to all broker versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6792) Wrong pointer in the link for stream dsl
[ https://issues.apache.org/jira/browse/KAFKA-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6792. -- Resolution: Fixed Fix Version/s: 1.0.0 > Wrong pointer in the link for stream dsl > > > Key: KAFKA-6792 > URL: https://issues.apache.org/jira/browse/KAFKA-6792 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: robin m >Priority: Major > Fix For: 1.0.0 > > > Wrong pointer in the link for stream dsl. > actual is : > [http://kafka.apache.org/11/documentation/streams/developer-guide#streams|http://kafka.apache.org/11/documentation/streams/developer-guide/#streams]_dsl > correct is : > http://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#streams-dsl -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-6794: --- Description: Say you have a replication factor of 4 and you trigger a reassignment which moves all replicas to new brokers. Now 8 replicas are fetching at the same time which means you need to account for 8 times the current producer load plus the catch-up replication. To make matters worse, the replicas won't all become in-sync at the same time; in the worst case, you could have 7 replicas in-sync while one is still catching up. Currently, the old replicas won't be disabled until all new replicas are in-sync. This makes configuring the throttle tricky since ISR traffic is not subject to it. Rather than trying to bring all 4 new replicas online at the same time, a friendlier approach would be to do it incrementally: bring one replica online, bring it in-sync, then remove one of the old replicas. Repeat until all replicas have been changed. This would reduce the impact of a reassignment and make configuring the throttle easier at the cost of a slower overall reassignment. was: Say you have a replication factor of 4 and you trigger a reassignment which moves all replicas to new brokers. Now 8 replicas are fetching at the same time which means you need to account for 8 times the current load plus the catch-up replication. To make matters worse, the replicas won't all become in-sync at the same time; in the worst case, you could have 7 replicas in-sync while one is still catching up. Currently, the old replicas won't be disabled until all new replicas are in-sync. This makes configuring the throttle tricky since ISR traffic is not subject to it. Rather than trying to bring all 4 new replicas online at the same time, a friendlier approach would be to do it incrementally: bring one replica online, bring it in-sync, then remove one of the old replicas. Repeat until all replicas have been changed. This would reduce the impact of a reassignment and make configuring the throttle easier at the cost of a slower overall reassignment. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6794) Support for incremental replica reassignment
Jason Gustafson created KAFKA-6794: -- Summary: Support for incremental replica reassignment Key: KAFKA-6794 URL: https://issues.apache.org/jira/browse/KAFKA-6794 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Say you have a replication factor of 4 and you trigger a reassignment which moves all replicas to new brokers. Now 8 replicas are fetching at the same time which means you need to account for 8 times the current load plus the catch-up replication. To make matters worse, the replicas won't all become in-sync at the same time; in the worst case, you could have 7 replicas in-sync while one is still catching up. Currently, the old replicas won't be disabled until all new replicas are in-sync. This makes configuring the throttle tricky since ISR traffic is not subject to it. Rather than trying to bring all 4 new replicas online at the same time, a friendlier approach would be to do it incrementally: bring one replica online, bring it in-sync, then remove one of the old replicas. Repeat until all replicas have been changed. This would reduce the impact of a reassignment and make configuring the throttle easier at the cost of a slower overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower
[ https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anna Povzner reassigned KAFKA-6361: --- Assignee: Anna Povzner (was: Jason Gustafson) > Fast leader fail over can lead to log divergence between leader and follower > > > Key: KAFKA-6361 > URL: https://issues.apache.org/jira/browse/KAFKA-6361 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Anna Povzner >Priority: Major > Labels: reliability > > We have observed an edge case in the replication failover logic which can > cause a replica to permanently fall out of sync with the leader or, in the > worst case, actually have localized divergence between logs. This occurs in > spite of the improved truncation logic from KIP-101. > Suppose we have brokers A and B. Initially A is the leader in epoch 1. It > appends two batches: one in the range (0, 10) and the other in the range (11, > 20). The first one successfully replicates to B, but the second one does not. > In other words, the logs on the brokers look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > {code} > Broker A then has a zk session expiration and broker B is elected with epoch > 2. It appends a new batch with offsets (11, n) to its local log. So we now > have this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Normally we expect broker A to truncate to offset 11 on becoming the > follower, but before it is able to do so, broker B has its own zk session > expiration and broker A again becomes leader, now with epoch 3. It then > appends a new entry in the range (21, 30). The updated logs look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > 2: offsets: [21, 30], leader epoch: 3 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Now what happens next depends on the last offset of the batch appended in > epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch > request to broker A with epoch 2. Broker A will respond that epoch 2 ends at > offset 21. There are three cases: > 1) n < 20: In this case, broker B will not do any truncation. It will begin > fetching from offset n, which will ultimately cause an out of order offset > error because broker A will return the full batch beginning from offset 11 > which broker B will be unable to append. > 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 > and everything will appear fine though the logs have actually diverged. > 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in > the middle of the batch, it will truncate all the way to offset 10. It can > begin fetching from offset 11 and everything is fine. > The case we have actually seen is the first one. The second one would likely > go unnoticed in practice and everything is fine in the third case. To > workaround the issue, we deleted the active segment on the replica which > allowed it to re-replicate consistently from the leader. > I'm not sure the best solution for this scenario. Maybe if the leader isn't > aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} > instead of using the offset of the next highest epoch. That would cause the > follower to truncate using its high watermark. Or perhaps instead of doing > so, it could send another OffsetForLeaderEpoch request at the next previous > cached epoch and then truncate using that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric
[ https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439737#comment-16439737 ] ASF GitHub Bot commented on KAFKA-6514: --- hachikuji closed pull request #4506: KAFKA-6514: Add API version as a tag for the RequestsPerSec metric URL: https://github.com/apache/kafka/pull/4506 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 8a17528bfb7..f03bdeb9dd9 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -158,7 +158,7 @@ object RequestChannel extends Logging { val metricNames = fetchMetricNames :+ header.apiKey.name metricNames.foreach { metricName => val m = metrics(metricName) -m.requestRate.mark() +m.requestRate(header.apiVersion).mark() m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs)) m.localTimeHist.update(Math.round(apiLocalTimeMs)) m.remoteTimeHist.update(Math.round(apiRemoteTimeMs)) @@ -350,10 +350,11 @@ object RequestMetrics { } class RequestMetrics(name: String) extends KafkaMetricsGroup { + import RequestMetrics._ val tags = Map("request" -> name) - val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags) + val requestRateInternal = new mutable.HashMap[Short, Meter] // time a request spent in a request queue val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags) // time a request takes to be processed at the local broker @@ -386,6 +387,10 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { private val errorMeters = mutable.Map[Errors, ErrorMeter]() Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, error))) + def requestRate(version: Short): Meter = { + requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags + ("version" -> version.toString))) + } + class ErrorMeter(name: String, error: Errors) { private val tags = Map("request" -> name, "error" -> error.name) @@ -418,7 +423,7 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { } def removeMetrics(): Unit = { -removeMetric(RequestsPerSec, tags) +for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, tags + ("version" -> version.toString)) removeMetric(RequestQueueTimeMs, tags) removeMetric(LocalTimeMs, tags) removeMetric(RemoteTimeMs, tags) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index e6dadbb4253..7d3b42897cc 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -688,10 +688,14 @@ class SocketServerTest extends JUnitSuite { @Test def testRequestMetricsAfterStop(): Unit = { server.stopProcessingRequests() - -server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark() +val version = ApiKeys.PRODUCE.latestVersion +val version2 = (version - 1).toShort +for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark() + server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark() +assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count()) server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) -val nonZeroMeters = Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 1, +val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2, + s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1, "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1) def requestMetricMeters = YammerMetrics diff --git a/docs/upgrade.html b/docs/upgrade.html index 95f2c418c3b..d369d1de79d 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -68,6 +68,12 @@ Notable changes in 1 https://cwiki.apache.org/confluence/x/oYtjB";>KIP-186 increases the default offset retention time from 1 day to 7 days. This makes it less likely to "lose" offsets in an application that commits infrequently. It also increases the active set of offsets and therefore can increase memory usage on the broker. Note that the console consumer currently enables offs
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439599#comment-16439599 ] Jason Gustafson commented on KAFKA-6649: [~srinivas.d...@gmail.com] I think those log messages are just a little misleading and not necessarily indicative of a problem. When the leader loads the replica state, it only knows the local high watermark. Other replicas are initialized to 0 until they begin fetching. This is normal. Can you clarify what you mean when you say it is causing topics to replay? > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5975) No response when deleting topics and delete.topic.enable=false
[ https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439572#comment-16439572 ] Mario Molina edited comment on KAFKA-5975 at 4/16/18 3:19 PM: -- What about this fix? [~ewencp] [~rsivaram] was (Author: mmolimar): What about this fix [~ewencp]? > No response when deleting topics and delete.topic.enable=false > -- > > Key: KAFKA-5975 > URL: https://issues.apache.org/jira/browse/KAFKA-5975 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0 >Reporter: Mario Molina >Assignee: Mario Molina >Priority: Minor > Fix For: 1.0.2 > > > When trying to delete topics using the KafkaAdminClient and the flag in > server config is set as 'delete.topic.enable=false', the client cannot get a > response and fails returning a timeout error. This is due to the object > DelayedCreatePartitions cannot complete the operation. > This bug fix modifies the KafkaApi key DELETE_TOPICS taking into account that > the flag can be disabled and swallow the error to the client, this is, the > topic is never removed and no error is returned to the client. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5975) No response when deleting topics and delete.topic.enable=false
[ https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439572#comment-16439572 ] Mario Molina commented on KAFKA-5975: - What about this fix [~ewencp]? > No response when deleting topics and delete.topic.enable=false > -- > > Key: KAFKA-5975 > URL: https://issues.apache.org/jira/browse/KAFKA-5975 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0 >Reporter: Mario Molina >Assignee: Mario Molina >Priority: Minor > Fix For: 1.0.2 > > > When trying to delete topics using the KafkaAdminClient and the flag in > server config is set as 'delete.topic.enable=false', the client cannot get a > response and fails returning a timeout error. This is due to the object > DelayedCreatePartitions cannot complete the operation. > This bug fix modifies the KafkaApi key DELETE_TOPICS taking into account that > the flag can be disabled and swallow the error to the client, this is, the > topic is never removed and no error is returned to the client. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439502#comment-16439502 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4880: KAFKA-6054: Update Kafka Streams metadata to version 3 URL: https://github.com/apache/kafka/pull/4880 - adds Streams upgrade tests for 1.1 release This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue th
[jira] [Created] (KAFKA-6793) Unnecessary warning log message
Anna O created KAFKA-6793: - Summary: Unnecessary warning log message Key: KAFKA-6793 URL: https://issues.apache.org/jira/browse/KAFKA-6793 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Anna O When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log started to appear: level: WARN logger: org.apache.kafka.clients.consumer.ConsumerConfig message: The configuration 'admin.retries' was supplied but isn't a known config. The config is not explicitly supplied to the streams. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6771) Make specifying partitions in RoundTripWorkload, ProduceBench more flexible
[ https://issues.apache.org/jira/browse/KAFKA-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439088#comment-16439088 ] ASF GitHub Bot commented on KAFKA-6771: --- rajinisivaram closed pull request #4850: KAFKA-6771. Make specifying partitions more flexible URL: https://github.com/apache/kafka/pull/4850 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java new file mode 100644 index 000..82f5003fbb6 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + +import java.util.HashSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utilities for expanding strings that have range expressions in them. + * + * For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3. + * Strings that have no range expressions will not be expanded. + */ +public class StringExpander { +private final static Pattern NUMERIC_RANGE_PATTERN = +Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)"); + +public static HashSet expand(String val) { +HashSet set = new HashSet<>(); +Matcher matcher = NUMERIC_RANGE_PATTERN.matcher(val); +if (!matcher.matches()) { +set.add(val); +return set; +} +String prequel = matcher.group(1); +String rangeStart = matcher.group(2); +String rangeEnd = matcher.group(3); +String epilog = matcher.group(4); +int rangeStartInt = Integer.parseInt(rangeStart); +int rangeEndInt = Integer.parseInt(rangeEnd); +if (rangeEndInt < rangeStartInt) { +throw new RuntimeException("Invalid range: start " + rangeStartInt + +" is higher than end " + rangeEndInt); +} +for (int i = rangeStartInt; i <= rangeEndInt; i++) { +set.add(String.format("%s%d%s", prequel, i, epilog)); +} +return set; +} +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 0677296ee3c..f2eb0343aa8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -132,7 +132,7 @@ public StopTaskResponse stopTask(StopTaskRequest request) throws Exception { public TasksResponse tasks(TasksRequest request) throws Exception { UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks")); -uriBuilder.queryParam("taskId", request.taskIds().toArray(new String[0])); +uriBuilder.queryParam("taskId", (Object[]) request.taskIds().toArray(new String[0])); uriBuilder.queryParam("firstStartMs", request.firstStartMs()); uriBuilder.queryParam("lastStartMs", request.lastStartMs()); uriBuilder.queryParam("firstEndMs", request.firstEndMs()); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java index cef913bc01a..1b429ead3c8 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java @@ -41,10 +41,7 @@ private final Map consumerConf; private final Map adminClientConf; private final Map commonClientConf; -private final String topicRegex; -private final int startPartition; -private final int endPartition; - +private final TopicsSpec activeTopics; @JsonCreator public ConsumeBenchSpec(@Json
[jira] [Commented] (KAFKA-6696) Trogdor should support destroying tasks
[ https://issues.apache.org/jira/browse/KAFKA-6696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439086#comment-16439086 ] ASF GitHub Bot commented on KAFKA-6696: --- rajinisivaram closed pull request #4759: KAFKA-6696 Trogdor should support destroying tasks URL: https://github.com/apache/kafka/pull/4759 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 2767132886d..64258bf7b07 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -45,7 +45,7 @@ + files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/> diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index 3b5b21e68d8..0324d2d2dba 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -27,10 +27,9 @@ import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; -import org.apache.kafka.trogdor.rest.CreateWorkerResponse; +import org.apache.kafka.trogdor.rest.DestroyWorkerRequest; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.StopWorkerRequest; -import org.apache.kafka.trogdor.rest.StopWorkerResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,13 +94,16 @@ public AgentStatusResponse status() throws Exception { return new AgentStatusResponse(serverStartMs, workerManager.workerStates()); } -public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws Exception { -workerManager.createWorker(req.id(), req.spec()); -return new CreateWorkerResponse(req.spec()); +public void createWorker(CreateWorkerRequest req) throws Throwable { +workerManager.createWorker(req.workerId(), req.taskId(), req.spec()); } -public StopWorkerResponse stopWorker(StopWorkerRequest req) throws Exception { -return new StopWorkerResponse(workerManager.stopWorker(req.id())); +public void stopWorker(StopWorkerRequest req) throws Throwable { +workerManager.stopWorker(req.workerId(), false); +} + +public void destroyWorker(DestroyWorkerRequest req) throws Throwable { +workerManager.stopWorker(req.workerId(), true); } public static void main(String[] args) throws Exception { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java index 08769a0971d..c89011b8650 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java @@ -27,15 +27,16 @@ import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; -import org.apache.kafka.trogdor.rest.CreateWorkerResponse; +import org.apache.kafka.trogdor.rest.DestroyWorkerRequest; import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse; import org.apache.kafka.trogdor.rest.StopWorkerRequest; -import org.apache.kafka.trogdor.rest.StopWorkerResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.UriBuilder; + import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; @@ -116,20 +117,29 @@ public AgentStatusResponse status() throws Exception { return resp.body(); } -public CreateWorkerResponse createWorker(CreateWorkerRequest request) throws Exception { -HttpResponse resp = -JsonRestServer.httpRequest( +public void createWorker(CreateWorkerRequest request) throws Exception { +HttpResponse resp = +JsonRestServer.httpRequest( url("/agent/worker/create"), "POST", -request, new TypeReference() { }, maxTries); -return resp.body(); +request, new TypeReference() { }, maxTries); +resp.body(); } -public StopWorkerResponse stopWorker(StopWorkerRequest request) throws Exception { -HttpResponse resp = -JsonRestServer.httpRequest(url( +public void stopWorker(StopWorkerRequest request) throws Exception { +HttpResponse resp = +JsonRestServer.httpReque
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439050#comment-16439050 ] ASF GitHub Bot commented on KAFKA-6788: --- cyrusv opened a new pull request #4878: KAFKA-6788: Combine queries for describe and delete groups in AdminCl… URL: https://github.com/apache/kafka/pull/4878 1 Ask coordinator to list its groups 2 Try to delete/describe them all 3 Track which entries are successfully deleted or described 4 Continue over succesfully deleted/described entries This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Grouping consumer requests per consumer coordinator in admin client > --- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)