[jira] [Created] (KAFKA-6797) Connect oracle database to kafka and stream data

2018-04-16 Thread Ujjwal Kumar (JIRA)
Ujjwal Kumar created KAFKA-6797:
---

 Summary: Connect oracle database to kafka and stream data
 Key: KAFKA-6797
 URL: https://issues.apache.org/jira/browse/KAFKA-6797
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Affects Versions: 1.0.1
 Environment: Windows OS
Reporter: Ujjwal Kumar


Hi,


Please help me with the steps to connect kafka with oracle db or any other db.
I tried searching online but couldn't find anything.

My requirement is simple, Kafka should be able to get the data from a oracle 
database and we can consume that elsewhere.

I have windows setup



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid

2018-04-16 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6266:
---
Affects Version/s: 1.0.1

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0, 1.0.1
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>Priority: Major
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
>  I'm seeing these continuously in the log, and want these to be fixed- so 
> that they wont repeat. Can someone please help me in fixing the below 
> warnings.
> {code}
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is

2018-04-16 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440212#comment-16440212
 ] 

Jeff Widman edited comment on KAFKA-6266 at 4/17/18 1:07 AM:
-

I am hitting this after upgrading a 0.10.0.1 broker to 1.0.1. 

the __consumer_offsets topic has the following config:
{code}
Topic:__consumer_offsetsPartitionCount:157  ReplicationFactor:2 
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
{code}

Ignore the non-standard partition count, that is a byproduct of a bug from 
years ago. In this case, I think the only effect is that it makes it less 
likely that a partition within the __consumer_offsets topic gets produced to, 
which it sounds like would clear this error.

As described above, the external symptoms were a zero-byte log file that has a 
name like 00012526.log. 

Since this particular cluster has significantly more partitions in 
__consumer_offsets than it does consumer groups, it will not clear the error 
anytime soon because no consumer groups offsets are being hashed onto the 
problem partitions.

So to get out of the situation, I shutdown all brokers that have replicas of 
the partition, then deleted the logfiles for that partition, then restarted the 
brokers. This cleared the filename so that it matched the zero-byte contents.

Note that doing this in production will require downtime as you are taking a 
partition in the __consumer_offsets topic completely offline. On the flip side, 
you are only likely to hit this on somewhat underloaded clusters that can 
likely afford downtime... typically busy production clusters will clear 
themselves automatically through consumer groups producing to this partition.


was (Author: jeffwidman):
I am hitting this after upgrading a 0.10.0.1 broker to 1.0.1. 

the __consumer_offsets topic has the following config:
{code}
Topic:__consumer_offsetsPartitionCount:157  ReplicationFactor:2 
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
{code}

Ignore the non-standard partition count, that is a byproduct of a bug from 
years ago. In this case, I think the only effect is that it makes it less 
likely that a partition within the __consumer_offsets topic gets produced to, 
which it sounds like would clear this error.

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>Priority: Major
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
>  I'm seeing these continuously in the log, and want these to be fixed- so 
> that they wont repeat. Can someone please help me in fixing the below 
> warnings.
> {code}
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval

2018-04-16 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440212#comment-16440212
 ] 

Jeff Widman commented on KAFKA-6266:


I am hitting this after upgrading a 0.10.0.1 broker to 1.0.1. 

the __consumer_offsets topic has the following config:
{code}
Topic:__consumer_offsetsPartitionCount:157  ReplicationFactor:2 
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
{code}

Ignore the non-standard partition count, that is a byproduct of a bug from 
years ago. In this case, I think the only effect is that it makes it less 
likely that a partition within the __consumer_offsets topic gets produced to, 
which it sounds like would clear this error.

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>Priority: Major
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
>  I'm seeing these continuously in the log, and want these to be fixed- so 
> that they wont repeat. Can someone please help me in fixing the below 
> warnings.
> {code}
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is invalid

2018-04-16 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6266:
---
Description: 
I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
warnings in the log.
 I'm seeing these continuously in the log, and want these to be fixed- so that 
they wont repeat. Can someone please help me in fixing the below warnings.
{code}
WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 
3346 since the checkpointed offset 3332 is invalid. 
(kafka.log.LogCleanerManager$)
 WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 
4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$)
 WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 
203569 since the checkpointed offset 120955 is invalid. 
(kafka.log.LogCleanerManager$)
 WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 
16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$)
{code}

  was:
I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
warnings in the log.
I'm seeing these continuously in the log, and want these to be fixed- so that 
they wont repeat. Can someone please help me in fixing the below warnings.

WARN Resetting first dirty offset of __consumer_offsets-17 to log start offset 
3346 since the checkpointed offset 3332 is invalid. 
(kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-23 to log start offset 
4 since the checkpointed offset 1 is invalid. (kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-19 to log start offset 
203569 since the checkpointed offset 120955 is invalid. 
(kafka.log.LogCleanerManager$)
WARN Resetting first dirty offset of __consumer_offsets-35 to log start offset 
16957 since the checkpointed offset 7 is invalid. (kafka.log.LogCleanerManager$)


> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>Priority: Major
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
>  I'm seeing these continuously in the log, and want these to be fixed- so 
> that they wont repeat. Can someone please help me in fixing the below 
> warnings.
> {code}
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6650) The controller should be able to handle a partially deleted topic

2018-04-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6650.

   Resolution: Fixed
Fix Version/s: 2.0.0

merged the PR to trunk.

> The controller should be able to handle a partially deleted topic
> -
>
> Key: KAFKA-6650
> URL: https://issues.apache.org/jira/browse/KAFKA-6650
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.0.0
>
>
> A previous controller could have deleted some partitions of a topic from ZK, 
> but not all partitions, and then died.
> In that case, the new controller should be able to handle the partially 
> deleted topic, and finish the deletion.
> In the current code base, if there is no leadership info for a replica's 
> partition, the transition to OfflineReplica state for the replica will fail. 
> Afterwards the transition to ReplicaDeletionStarted will fail as well since 
> the only valid previous state for ReplicaDeletionStarted is OfflineReplica. 
> Furthermore, it means the topic deletion will never finish.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6650) The controller should be able to handle a partially deleted topic

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440195#comment-16440195
 ] 

ASF GitHub Bot commented on KAFKA-6650:
---

junrao closed pull request #4825: KAFKA-6650: Allowing transition to 
OfflineReplica state for replicas without leadership info
URL: https://github.com/apache/kafka/pull/4825
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index a2d04e65ae6..5fafcc4fe3f 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig,
   
controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), 
replica.topicPartition,
 deletePartition = false, (_, _) => ())
 }
-val replicasToRemoveFromIsr = validReplicas.filter(replica => 
controllerContext.partitionLeadershipInfo.contains(replica.topicPartition))
-val updatedLeaderIsrAndControllerEpochs = 
removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition))
+val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = 
validReplicas.partition { replica =>
+  
controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
+}
+val updatedLeaderIsrAndControllerEpochs = 
removeReplicasFromIsr(replicaId, 
replicasWithLeadershipInfo.map(_.topicPartition))
 updatedLeaderIsrAndControllerEpochs.foreach { case (partition, 
leaderIsrAndControllerEpoch) =>
   if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
 val recipients = 
controllerContext.partitionReplicaAssignment(partition).filterNot(_ == 
replicaId)
@@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig,
   logSuccessfulTransition(replicaId, partition, replicaState(replica), 
OfflineReplica)
   replicaState.put(replica, OfflineReplica)
 }
+
+replicasWithoutLeadershipInfo.foreach { replica =>
+  logSuccessfulTransition(replicaId, replica.topicPartition, 
replicaState(replica), OfflineReplica)
+  replicaState.put(replica, OfflineReplica)
+}
   case ReplicaDeletionStarted =>
 validReplicas.foreach { replica =>
   logSuccessfulTransition(replicaId, replica.topicPartition, 
replicaState(replica), ReplicaDeletionStarted)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 9b58fc7cc4b..a65128ad98a 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
* @return true if path gets deleted successfully, false if root path 
doesn't exist
* @throws KeeperException if there is an error while deleting the znodes
*/
-  private[zk] def deleteRecursive(path: String): Boolean = {
+  def deleteRecursive(path: String): Boolean = {
 val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(path))
 getChildrenResponse.resultCode match {
   case Code.OK =>
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ef455d4457e..4c033c421bb 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -17,7 +17,7 @@
 package kafka.admin
 
 import kafka.log.Log
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
 import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
@@ -326,7 +326,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 brokerConfigs.head.setProperty("log.segment.bytes","100")
 brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
-servers = createTestTopicAndCluster(topic,brokerConfigs)
+servers = createTestTopicAndCluster(topic, brokerConfigs, 
expectedReplicaAssignment)
 
 // for simplicity, we are validating cleaner offsets on a single broker
 val server = servers.head
@@ -363,18 +363,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: 
Boolean = true): Seq[KafkaServer] = {
+  private def createTestTopic

[jira] [Commented] (KAFKA-6796) Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440133#comment-16440133
 ] 

ASF GitHub Bot commented on KAFKA-6796:
---

hachikuji opened a new pull request #4883: KAFKA-6796; Fix surprising 
UNKNOWN_TOPIC error from requests to non-replicas
URL: https://github.com/apache/kafka/pull/4883
 
 
   Currently if the client sends a produce request or a fetch request to a 
broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a 
bit surprising to see when the topic actually exists. It would be better to 
return NOT_LEADER to avoid confusion. Clients typically handle both errors by 
refreshing metadata and retrying, so changing this should not cause any change 
in behavior on the client. This case can be hit following a partition 
reassignment after the leader is moved and the local replica is deleted.
   
   To validate the current behavior and the fix, I've added  integration tests 
for the fetch and produce APIs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas
> -
>
> Key: KAFKA-6796
> URL: https://issues.apache.org/jira/browse/KAFKA-6796
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Currently if the client sends a produce request or a fetch request to a 
> broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a 
> bit surprising to see when the topic actually exists. It would be better to 
> return NOT_LEADER to avoid confusion. Clients typically handle both errors by 
> refreshing metadata and retrying, so changing this should not cause any 
> change in behavior on the client. This case can be hit following a partition 
> reassignment after the leader is moved and the local replica is deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6796) Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas

2018-04-16 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6796:
--

 Summary: Surprising UNKNOWN_TOPIC error for produce/fetch requests 
to non-replicas
 Key: KAFKA-6796
 URL: https://issues.apache.org/jira/browse/KAFKA-6796
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.1, 1.1.0
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Currently if the client sends a produce request or a fetch request to a broker 
which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a bit 
surprising to see when the topic actually exists. It would be better to return 
NOT_LEADER to avoid confusion. Clients typically handle both errors by 
refreshing metadata and retrying, so changing this should not cause any change 
in behavior on the client. This case can be hit following a partition 
reassignment after the leader is moved and the local replica is deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-04-16 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440047#comment-16440047
 ] 

Srinivas Dhruvakumar edited comment on KAFKA-6649 at 4/16/18 9:27 PM:
--

The Offset Out Of Range Exception has been fixed but we hit the below when we 
were in the middle of our release. 

We got an exception while releasing the latest patch on one of the clusters. Is 
this expected ? 

2018-04-13 00:14:18,344 [ReplicaFetcherThread-2-348] ERROR 
(kafka.server.ReplicaFetcherThread) - [ReplicaFetcherThread-2-348]: Error due 
to kafka.common.KafkaException: error processing data for partition 
[topic1-0,0] offset 325231 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:208)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: 
java.lang.IllegalArgumentException: High watermark offset should be 
non-negative at kafka.cluster.Replica.highWatermark_$eq(Replica.scala:144) at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:109)
 at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:186)
 ... 13 more


was (Author: srinivas.d...@gmail.com):
The Offset Out Of Range Exception has been fixed but we hit the below when we 
were in the middle of our release. 

We got an exception while releasing the latest patch on one of the clusters. 

2018-04-13 00:14:18,344 [ReplicaFetcherThread-2-348] ERROR 
(kafka.server.ReplicaFetcherThread) - [ReplicaFetcherThread-2-348]: Error due 
to kafka.common.KafkaException: error processing data for partition 
[topic1-0,0] offset 325231 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:208)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: 
java.lang.IllegalArgumentException: High watermark offset should be 
non-negative at kafka.cluster.Replica.highWatermark_$eq(Replica.scala:144) at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:109)
 at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFet

[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-04-16 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440047#comment-16440047
 ] 

Srinivas Dhruvakumar commented on KAFKA-6649:
-

The Offset Out Of Range Exception has been fixed but we hit the below when we 
were in the middle of our release. 

We got an exception while releasing the latest patch on one of the clusters. 

2018-04-13 00:14:18,344 [ReplicaFetcherThread-2-348] ERROR 
(kafka.server.ReplicaFetcherThread) - [ReplicaFetcherThread-2-348]: Error due 
to kafka.common.KafkaException: error processing data for partition 
[topic1-0,0] offset 325231 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:208)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
 at scala.Option.foreach(Option.scala:257) at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: 
java.lang.IllegalArgumentException: High watermark offset should be 
non-negative at kafka.cluster.Replica.highWatermark_$eq(Replica.scala:144) at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:109)
 at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:186)
 ... 13 more

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2

[jira] [Issue Comment Deleted] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-04-16 Thread Srinivas Dhruvakumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Srinivas Dhruvakumar updated KAFKA-6649:

Comment: was deleted

(was: [~hachikuji] - Sorry for the miscommunication. We had an internal bug. I 
can confirm that the fix works and is no longer an issue.  This bug is fixed as 
part of -KAFKA-3978-. patch)

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread

2018-04-16 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439991#comment-16439991
 ] 

Dong Lin commented on KAFKA-6795:
-

Thanks for adding more tests [~apovzner]. Currently ReplicaAlterLogDirsThread 
is indirectly tested in AdminClientIntegrationTest.testAlterReplicaLogDirs(). 
Not sure if this test does what you are looking for.

> Add unit test for ReplicaAlterLogDirsThread
> ---
>
> Key: KAFKA-6795
> URL: https://issues.apache.org/jira/browse/KAFKA-6795
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Major
>
> ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
> there is no unit test. 
> [~lindong] I assigned this to myself, since ideally I wanted to add unit 
> tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread

2018-04-16 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6795:
---

 Summary: Add unit test for ReplicaAlterLogDirsThread
 Key: KAFKA-6795
 URL: https://issues.apache.org/jira/browse/KAFKA-6795
 Project: Kafka
  Issue Type: Improvement
Reporter: Anna Povzner
Assignee: Anna Povzner


ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
there is no unit test. 

[~lindong] I assigned this to myself, since ideally I wanted to add unit tests 
for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439970#comment-16439970
 ] 

ASF GitHub Bot commented on KAFKA-6361:
---

apovzner opened a new pull request #4882:  KAFKA-6361: Fix log divergence 
between leader and follower after fast leader fail over
URL: https://github.com/apache/kafka/pull/4882
 
 
   WIP - will add few more unit tests.
   
   Implementation of KIP-279 as described here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over
   
   In summary:
   - Added leader_epoch to OFFSET_FOR_LEADER_EPOCH_RESPONSE
   - Leader replies with the pair( largest epoch less than or equal to the 
requested epoch, the end offset of this epoch)
   - If Follower does not know about the leader epoch that leader replies with, 
it truncates to the end offset of largest leader epoch less than leader epoch 
that leader replied with, and sends another OffsetForLeaderEpoch request. That 
request contains the largest leader epoch less than leader epoch that leader 
replied with.
   
   Added integration test 
EpochDrivenReplicationProtocolAcceptanceTest.logsShouldNotDivergeOnUncleanLeaderElections
 that does 3 fast leader changes where unclean leader election is enabled and 
min isr is 1. The test failed before the fix was implemented.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>  Labels: reliability
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and

[jira] [Comment Edited] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-04-16 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439951#comment-16439951
 ] 

Srinivas Dhruvakumar edited comment on KAFKA-6649 at 4/16/18 7:51 PM:
--

[~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can 
confirm that the fix works and is no longer an issue.  This bug is fixed as 
part of -KAFKA-3978-. patch


was (Author: srinivas.d...@gmail.com):
[~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can 
confirm that the fix works and is no longer an issue.  This bug is fixed as 
part of KAFKA-3978.

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-04-16 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439951#comment-16439951
 ] 

Srinivas Dhruvakumar commented on KAFKA-6649:
-

[~hachikuji] - Sorry for the miscommunication. We had an internal bug. I can 
confirm that the fix works and is no longer an issue.  This bug is fixed as 
part of KAFKA-3978.

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4327) Move Reset Tool from core to streams

2018-04-16 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4327.
--
Resolution: Won't Fix

I'm resolving this ticket as won't fix for now.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2018-04-16 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439943#comment-16439943
 ] 

Guozhang Wang commented on KAFKA-4327:
--

As we discussed before, it may not worth the effort since 1) most users are 
likely going to call the `streams reset` shell script in which they are likely 
going to having a core package installed anyways, 2) thinking about it twice, 
leaving the reset tool in `kafka.tools` is okay since it is where we put all 
the admin tooling classes; personally I think we could also consider 
`o.a.k.tools`, but again that is not very significant either.

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-04-16 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6514.

   Resolution: Fixed
Fix Version/s: 1.2.0

> Add API version as a tag for the RequestsPerSec metric
> --
>
> Key: KAFKA-6514
> URL: https://issues.apache.org/jira/browse/KAFKA-6514
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Allen Wang
>Priority: Major
> Fix For: 1.2.0
>
>
> After we upgrade broker to a new version, one important insight is to see how 
> many clients have been upgraded so that we can switch the message format when 
> most of the clients have also been updated to the new version to minimize the 
> performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6727) org.apache.kafka.clients.admin.Config has broken equals and hashCode method.

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439922#comment-16439922
 ] 

ASF GitHub Bot commented on KAFKA-6727:
---

hachikuji closed pull request #4796: KAFKA-6727 fix broken Config hashCode() 
and equals()
URL: https://github.com/apache/kafka/pull/4796
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
index 56f9c270792..c81c0b60274 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Config.java
@@ -21,39 +21,40 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * A configuration object containing the configuration entries for a resource.
- *
+ * 
  * The API of this class is evolving, see {@link AdminClient} for details.
  */
 @InterfaceStability.Evolving
 public class Config {
 
-private final Collection entries;
+private final Map entries = new HashMap<>();
 
 /**
  * Create a configuration instance with the provided entries.
  */
 public Config(Collection entries) {
-this.entries = Collections.unmodifiableCollection(entries);
+for (ConfigEntry entry : entries) {
+this.entries.put(entry.name(), entry);
+}
 }
 
 /**
  * Configuration entries for a resource.
  */
 public Collection entries() {
-return entries;
+return Collections.unmodifiableCollection(entries.values());
 }
 
 /**
  * Get the configuration entry with the provided name or null if there 
isn't one.
  */
 public ConfigEntry get(String name) {
-for (ConfigEntry entry : entries)
-if (entry.name().equals(name))
-return entry;
-return null;
+return entries.get(name);
 }
 
 @Override
@@ -75,6 +76,6 @@ public int hashCode() {
 
 @Override
 public String toString() {
-return "Config(entries=" + entries + ")";
+return "Config(entries=" + entries.values() + ")";
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
new file mode 100644
index 000..45b174bec19
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ConfigTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class ConfigTest {
+private static final ConfigEntry E1 = new ConfigEntry("a", "b");
+private static final ConfigEntry E2 = new ConfigEntry("c", "d");
+private Config config;
+
+@Before
+public void setUp() {
+final Collection entries = new ArrayList<>();
+entries.add(E1);
+entries.add(E2);
+
+config = new Config(entries);
+}
+
+@Test
+public void shouldGetEntry() {
+assertThat(config.get("a"), is(E1));
+assertThat(config.get("c"), is(E2));
+}
+
+@Test
+public void shouldReturnNullOnGetUnknownEntry() {
+assertThat(config.get("unknown"), is(nullValue()));
+}
+
+@Test
+public void shouldGetAllEntries() {
+assertThat(config.entries().size(), is(2));
+assertThat(config.entries(), hasItem

[jira] [Commented] (KAFKA-4327) Move Reset Tool from core to streams

2018-04-16 Thread Jorge Quilcate (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439821#comment-16439821
 ] 

Jorge Quilcate commented on KAFKA-4327:
---

Now that 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API]
 is merged, does it make sense to reopen this issue, to move `StreamsResetter` 
to `streams` module? or should we just keep it in `core` module and close it? 

> Move Reset Tool from core to streams
> 
>
> Key: KAFKA-4327
> URL: https://issues.apache.org/jira/browse/KAFKA-4327
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Minor
>
> This is a follow up on https://issues.apache.org/jira/browse/KAFKA-4008
> Currently, Kafka Streams Application Reset Tool is part of {{core}} module 
> due to ZK dependency. After KIP-4 got merged, this dependency can be dropped 
> and the Reset Tool can be moved to {{streams}} module.
> This should also update {{InternalTopicManager#filterExistingTopics}} that 
> revers to ResetTool in an exception message:
> {{"Use 'kafka.tools.StreamsResetter' tool"}}
> -> {{"Use '" + kafka.tools.StreamsResetter.getClass().getName() + "' tool"}}
> Doing this JIRA also requires to update the docs with regard to broker 
> backward compatibility -- not all broker support "topic delete request" and 
> thus, the reset tool will not be backward compatible to all broker versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6792) Wrong pointer in the link for stream dsl

2018-04-16 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6792.
--
   Resolution: Fixed
Fix Version/s: 1.0.0

> Wrong pointer in the link for stream dsl
> 
>
> Key: KAFKA-6792
> URL: https://issues.apache.org/jira/browse/KAFKA-6792
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: robin m
>Priority: Major
> Fix For: 1.0.0
>
>
> Wrong pointer in the link for stream dsl.
> actual is : 
> [http://kafka.apache.org/11/documentation/streams/developer-guide#streams|http://kafka.apache.org/11/documentation/streams/developer-guide/#streams]_dsl
> correct is : 
> http://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#streams-dsl



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6794) Support for incremental replica reassignment

2018-04-16 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-6794:
---
Description: 
Say you have a replication factor of 4 and you trigger a reassignment which 
moves all replicas to new brokers. Now 8 replicas are fetching at the same time 
which means you need to account for 8 times the current producer load plus the 
catch-up replication. To make matters worse, the replicas won't all become 
in-sync at the same time; in the worst case, you could have 7 replicas in-sync 
while one is still catching up. Currently, the old replicas won't be disabled 
until all new replicas are in-sync. This makes configuring the throttle tricky 
since ISR traffic is not subject to it.

Rather than trying to bring all 4 new replicas online at the same time, a 
friendlier approach would be to do it incrementally: bring one replica online, 
bring it in-sync, then remove one of the old replicas. Repeat until all 
replicas have been changed. This would reduce the impact of a reassignment and 
make configuring the throttle easier at the cost of a slower overall 
reassignment.

  was:
Say you have a replication factor of 4 and you trigger a reassignment which 
moves all replicas to new brokers. Now 8 replicas are fetching at the same time 
which means you need to account for 8 times the current load plus the catch-up 
replication. To make matters worse, the replicas won't all become in-sync at 
the same time; in the worst case, you could have 7 replicas in-sync while one 
is still catching up. Currently, the old replicas won't be disabled until all 
new replicas are in-sync. This makes configuring the throttle tricky since ISR 
traffic is not subject to it.

Rather than trying to bring all 4 new replicas online at the same time, a 
friendlier approach would be to do it incrementally: bring one replica online, 
bring it in-sync, then remove one of the old replicas. Repeat until all 
replicas have been changed. This would reduce the impact of a reassignment and 
make configuring the throttle easier at the cost of a slower overall 
reassignment.


> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6794) Support for incremental replica reassignment

2018-04-16 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6794:
--

 Summary: Support for incremental replica reassignment
 Key: KAFKA-6794
 URL: https://issues.apache.org/jira/browse/KAFKA-6794
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Say you have a replication factor of 4 and you trigger a reassignment which 
moves all replicas to new brokers. Now 8 replicas are fetching at the same time 
which means you need to account for 8 times the current load plus the catch-up 
replication. To make matters worse, the replicas won't all become in-sync at 
the same time; in the worst case, you could have 7 replicas in-sync while one 
is still catching up. Currently, the old replicas won't be disabled until all 
new replicas are in-sync. This makes configuring the throttle tricky since ISR 
traffic is not subject to it.

Rather than trying to bring all 4 new replicas online at the same time, a 
friendlier approach would be to do it incrementally: bring one replica online, 
bring it in-sync, then remove one of the old replicas. Repeat until all 
replicas have been changed. This would reduce the impact of a reassignment and 
make configuring the throttle easier at the cost of a slower overall 
reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2018-04-16 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner reassigned KAFKA-6361:
---

Assignee: Anna Povzner  (was: Jason Gustafson)

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>  Labels: reliability
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could send another OffsetForLeaderEpoch request at the next previous 
> cached epoch and then truncate using that. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439737#comment-16439737
 ] 

ASF GitHub Bot commented on KAFKA-6514:
---

hachikuji closed pull request #4506: KAFKA-6514: Add API version as a tag for 
the RequestsPerSec metric
URL: https://github.com/apache/kafka/pull/4506
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 8a17528bfb7..f03bdeb9dd9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -158,7 +158,7 @@ object RequestChannel extends Logging {
   val metricNames = fetchMetricNames :+ header.apiKey.name
   metricNames.foreach { metricName =>
 val m = metrics(metricName)
-m.requestRate.mark()
+m.requestRate(header.apiVersion).mark()
 m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
 m.localTimeHist.update(Math.round(apiLocalTimeMs))
 m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
@@ -350,10 +350,11 @@ object RequestMetrics {
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
+
   import RequestMetrics._
 
   val tags = Map("request" -> name)
-  val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, 
tags)
+  val requestRateInternal = new mutable.HashMap[Short, Meter]
   // time a request spent in a request queue
   val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, 
tags)
   // time a request takes to be processed at the local broker
@@ -386,6 +387,10 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
   private val errorMeters = mutable.Map[Errors, ErrorMeter]()
   Errors.values.foreach(error => errorMeters.put(error, new ErrorMeter(name, 
error)))
 
+  def requestRate(version: Short): Meter = {
+  requestRateInternal.getOrElseUpdate(version, newMeter("RequestsPerSec", 
"requests", TimeUnit.SECONDS, tags + ("version" -> version.toString)))
+  }
+
   class ErrorMeter(name: String, error: Errors) {
 private val tags = Map("request" -> name, "error" -> error.name)
 
@@ -418,7 +423,7 @@ class RequestMetrics(name: String) extends 
KafkaMetricsGroup {
   }
 
   def removeMetrics(): Unit = {
-removeMetric(RequestsPerSec, tags)
+for (version <- requestRateInternal.keySet) removeMetric(RequestsPerSec, 
tags + ("version" -> version.toString))
 removeMetric(RequestQueueTimeMs, tags)
 removeMetric(LocalTimeMs, tags)
 removeMetric(RemoteTimeMs, tags)
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index e6dadbb4253..7d3b42897cc 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -688,10 +688,14 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testRequestMetricsAfterStop(): Unit = {
 server.stopProcessingRequests()
-
-server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate.mark()
+val version = ApiKeys.PRODUCE.latestVersion
+val version2 = (version - 1).toShort
+for (_ <- 0 to 1) 
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+assertEquals(2, 
server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
 server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE 
-> 1))
-val nonZeroMeters = 
Map("kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce" -> 
1,
+val nonZeroMeters = 
Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version"
 -> 2,
+
s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2"
 -> 1,
 
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE"
 -> 1)
 
 def requestMetricMeters = YammerMetrics
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 95f2c418c3b..d369d1de79d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,6 +68,12 @@ Notable changes in 1
 
 https://cwiki.apache.org/confluence/x/oYtjB";>KIP-186 
increases the default offset retention time from 1 day to 7 days. This makes it 
less likely to "lose" offsets in an application that commits infrequently. It 
also increases the active set of offsets and therefore can increase memory 
usage on the broker. Note that the console consumer currently enables offs

[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-04-16 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439599#comment-16439599
 ] 

Jason Gustafson commented on KAFKA-6649:


[~srinivas.d...@gmail.com] I think those log messages are just a little 
misleading and not necessarily indicative of a problem. When the leader loads 
the replica state, it only knows the local high watermark. Other replicas are 
initialized to 0 until they begin fetching. This is normal. Can you clarify 
what you mean when you say it is causing topics to replay?

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5975) No response when deleting topics and delete.topic.enable=false

2018-04-16 Thread Mario Molina (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439572#comment-16439572
 ] 

Mario Molina edited comment on KAFKA-5975 at 4/16/18 3:19 PM:
--

What about this fix? [~ewencp] [~rsivaram]


was (Author: mmolimar):
What about this fix [~ewencp]?

> No response when deleting topics and delete.topic.enable=false
> --
>
> Key: KAFKA-5975
> URL: https://issues.apache.org/jira/browse/KAFKA-5975
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Minor
> Fix For: 1.0.2
>
>
> When trying to delete topics using the KafkaAdminClient and the flag in 
> server config is set as 'delete.topic.enable=false', the client cannot get a 
> response and fails returning a timeout error. This is due to the object 
> DelayedCreatePartitions cannot complete the operation.
> This bug fix modifies the KafkaApi key DELETE_TOPICS taking into account that 
> the flag can be disabled and swallow the error to the client, this is, the 
> topic is never removed and no error is returned to the client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5975) No response when deleting topics and delete.topic.enable=false

2018-04-16 Thread Mario Molina (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439572#comment-16439572
 ] 

Mario Molina commented on KAFKA-5975:
-

What about this fix [~ewencp]?

> No response when deleting topics and delete.topic.enable=false
> --
>
> Key: KAFKA-5975
> URL: https://issues.apache.org/jira/browse/KAFKA-5975
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Minor
> Fix For: 1.0.2
>
>
> When trying to delete topics using the KafkaAdminClient and the flag in 
> server config is set as 'delete.topic.enable=false', the client cannot get a 
> response and fails returning a timeout error. This is due to the object 
> DelayedCreatePartitions cannot complete the operation.
> This bug fix modifies the KafkaApi key DELETE_TOPICS taking into account that 
> the flag can be disabled and swallow the error to the client, this is, the 
> topic is never removed and no error is returned to the client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439502#comment-16439502
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax opened a new pull request #4880:  KAFKA-6054: Update Kafka Streams 
metadata to version 3
URL: https://github.com/apache/kafka/pull/4880
 
 
   - adds Streams upgrade tests for 1.1 release


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue th

[jira] [Created] (KAFKA-6793) Unnecessary warning log message

2018-04-16 Thread Anna O (JIRA)
Anna O created KAFKA-6793:
-

 Summary: Unnecessary warning log message 
 Key: KAFKA-6793
 URL: https://issues.apache.org/jira/browse/KAFKA-6793
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.0
Reporter: Anna O


When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
started to appear:

level: WARN
logger: org.apache.kafka.clients.consumer.ConsumerConfig
message: The configuration 'admin.retries' was supplied but isn't a known 
config.

The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6771) Make specifying partitions in RoundTripWorkload, ProduceBench more flexible

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439088#comment-16439088
 ] 

ASF GitHub Bot commented on KAFKA-6771:
---

rajinisivaram closed pull request #4850: KAFKA-6771. Make specifying partitions 
more flexible
URL: https://github.com/apache/kafka/pull/4850
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java 
b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
new file mode 100644
index 000..82f5003fbb6
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utilities for expanding strings that have range expressions in them.
+ *
+ * For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3.
+ * Strings that have no range expressions will not be expanded.
+ */
+public class StringExpander {
+private final static Pattern NUMERIC_RANGE_PATTERN =
+Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)");
+
+public static HashSet expand(String val) {
+HashSet set = new HashSet<>();
+Matcher matcher = NUMERIC_RANGE_PATTERN.matcher(val);
+if (!matcher.matches()) {
+set.add(val);
+return set;
+}
+String prequel = matcher.group(1);
+String rangeStart = matcher.group(2);
+String rangeEnd = matcher.group(3);
+String epilog = matcher.group(4);
+int rangeStartInt = Integer.parseInt(rangeStart);
+int rangeEndInt = Integer.parseInt(rangeEnd);
+if (rangeEndInt < rangeStartInt) {
+throw new RuntimeException("Invalid range: start " + rangeStartInt 
+
+" is higher than end " + rangeEndInt);
+}
+for (int i = rangeStartInt; i <= rangeEndInt; i++) {
+set.add(String.format("%s%d%s", prequel, i, epilog));
+}
+return set;
+}
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 0677296ee3c..f2eb0343aa8 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -132,7 +132,7 @@ public StopTaskResponse stopTask(StopTaskRequest request) 
throws Exception {
 
 public TasksResponse tasks(TasksRequest request) throws Exception {
 UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
-uriBuilder.queryParam("taskId", request.taskIds().toArray(new 
String[0]));
+uriBuilder.queryParam("taskId", (Object[]) 
request.taskIds().toArray(new String[0]));
 uriBuilder.queryParam("firstStartMs", request.firstStartMs());
 uriBuilder.queryParam("lastStartMs", request.lastStartMs());
 uriBuilder.queryParam("firstEndMs", request.firstEndMs());
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index cef913bc01a..1b429ead3c8 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -41,10 +41,7 @@
 private final Map consumerConf;
 private final Map adminClientConf;
 private final Map commonClientConf;
-private final String topicRegex;
-private final int startPartition;
-private final int endPartition;
-
+private final TopicsSpec activeTopics;
 
 @JsonCreator
 public ConsumeBenchSpec(@Json

[jira] [Commented] (KAFKA-6696) Trogdor should support destroying tasks

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439086#comment-16439086
 ] 

ASF GitHub Bot commented on KAFKA-6696:
---

rajinisivaram closed pull request #4759: KAFKA-6696 Trogdor should support 
destroying tasks
URL: https://github.com/apache/kafka/pull/4759
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2767132886d..64258bf7b07 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -45,7 +45,7 @@
 
 
+  
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest).java"/>
 
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
index 3b5b21e68d8..0324d2d2dba 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
@@ -27,10 +27,9 @@
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,13 +94,16 @@ public AgentStatusResponse status() throws Exception {
 return new AgentStatusResponse(serverStartMs, 
workerManager.workerStates());
 }
 
-public CreateWorkerResponse createWorker(CreateWorkerRequest req) throws 
Exception {
-workerManager.createWorker(req.id(), req.spec());
-return new CreateWorkerResponse(req.spec());
+public void createWorker(CreateWorkerRequest req) throws Throwable {
+workerManager.createWorker(req.workerId(), req.taskId(), req.spec());
 }
 
-public StopWorkerResponse stopWorker(StopWorkerRequest req) throws 
Exception {
-return new StopWorkerResponse(workerManager.stopWorker(req.id()));
+public void stopWorker(StopWorkerRequest req) throws Throwable {
+workerManager.stopWorker(req.workerId(), false);
+}
+
+public void destroyWorker(DestroyWorkerRequest req) throws Throwable {
+workerManager.stopWorker(req.workerId(), true);
 }
 
 public static void main(String[] args) throws Exception {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java 
b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 08769a0971d..c89011b8650 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -27,15 +27,16 @@
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
-import org.apache.kafka.trogdor.rest.CreateWorkerResponse;
+import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.StopWorkerResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.UriBuilder;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
@@ -116,20 +117,29 @@ public AgentStatusResponse status() throws Exception {
 return resp.body();
 }
 
-public CreateWorkerResponse createWorker(CreateWorkerRequest request) 
throws Exception {
-HttpResponse resp =
-JsonRestServer.httpRequest(
+public void createWorker(CreateWorkerRequest request) throws Exception {
+HttpResponse resp =
+JsonRestServer.httpRequest(
 url("/agent/worker/create"), "POST",
-request, new TypeReference() { }, 
maxTries);
-return resp.body();
+request, new TypeReference() { }, maxTries);
+resp.body();
 }
 
-public StopWorkerResponse stopWorker(StopWorkerRequest request) throws 
Exception {
-HttpResponse resp =
-JsonRestServer.httpRequest(url(
+public void stopWorker(StopWorkerRequest request) throws Exception {
+HttpResponse resp =
+JsonRestServer.httpReque

[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439050#comment-16439050
 ] 

ASF GitHub Bot commented on KAFKA-6788:
---

cyrusv opened a new pull request #4878: KAFKA-6788: Combine queries for 
describe and delete groups in AdminCl…
URL: https://github.com/apache/kafka/pull/4878
 
 
   1 Ask coordinator to list its groups
   2 Try to delete/describe them all
   3 Track which entries are successfully deleted or described
   4 Continue over succesfully deleted/described entries
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)