[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-07-30 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8733:
--
Description: 
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is false.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}
Attached the graphs of disk weighted io time stats when this issue occurred.

I will raise a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
 describing options on how to handle this scenario.

 

  was:
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}
Attached the graphs of disk weighted io time stats when this issue occurred.

I will raise a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
 describing options on how to handle this scenario.

 


> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a 

[jira] [Commented] (KAFKA-8716) broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 2.2.1 or 2.3.0

2019-07-30 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896743#comment-16896743
 ] 

Ismael Juma commented on KAFKA-8716:


The documentation is out of date. I'll submit a PR to update it.

> broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 
> 2.2.1 or 2.3.0
> 
>
> Key: KAFKA-8716
> URL: https://issues.apache.org/jira/browse/KAFKA-8716
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Yu Yang
>Priority: Critical
>
> We are trying to upgrade kafka binary from 2.1 to 2.2.1 or 2.3.0. For both 
> versions, the broker with updated binary (2.2.1 or 2.3.0) could not get 
> started due to zookeeper session expiration exception.   This error happens 
> repeatedly and the broker could not start because of this. 
> Below is our zk related setting in server.properties:
> {code}
> zookeeper.connection.timeout.ms=6000
> zookeeper.session.timeout.ms=6000
> {code}
> The following is the stack trace, and we are using zookeeper 3.5.3. Instead 
> of waiting for a few seconds, the SESSIONEXPIRED error returned immediately 
> in CheckedEphemeral.create call.  Any insights? 
> [2019-07-25 18:07:35,712] INFO Creating /brokers/ids/80 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> [2019-07-25 18:07:35,724] ERROR Error while creating ephemeral at 
> /brokers/ids/80 with return code: SESSIONEXPIRED 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> [2019-07-25 18:07:35,731] ERROR [KafkaServer id=80] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
> at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1725)
> at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:260)
> at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
> at kafka.Kafka$.main(Kafka.scala:75)
> at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8716) broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 2.2.1 or 2.3.0

2019-07-30 Thread Xu Zhiyuan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896733#comment-16896733
 ] 

Xu Zhiyuan commented on KAFKA-8716:
---

[~yuyang08] Although kafka integrate high version of zookeeper in the binary, 
but kafka document says that the current stable branch of zookeeper is 3.4 and 
the latest release of that branch is 3.4.9. Maybe 3.4.9 is a choice.

[zookeeper|[https://kafka.apache.org/23/documentation.html#zk]]

> broker cannot join the cluster after upgrading kafka binary from 2.1.1 to 
> 2.2.1 or 2.3.0
> 
>
> Key: KAFKA-8716
> URL: https://issues.apache.org/jira/browse/KAFKA-8716
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Yu Yang
>Priority: Critical
>
> We are trying to upgrade kafka binary from 2.1 to 2.2.1 or 2.3.0. For both 
> versions, the broker with updated binary (2.2.1 or 2.3.0) could not get 
> started due to zookeeper session expiration exception.   This error happens 
> repeatedly and the broker could not start because of this. 
> Below is our zk related setting in server.properties:
> {code}
> zookeeper.connection.timeout.ms=6000
> zookeeper.session.timeout.ms=6000
> {code}
> The following is the stack trace, and we are using zookeeper 3.5.3. Instead 
> of waiting for a few seconds, the SESSIONEXPIRED error returned immediately 
> in CheckedEphemeral.create call.  Any insights? 
> [2019-07-25 18:07:35,712] INFO Creating /brokers/ids/80 (is it secure? false) 
> (kafka.zk.KafkaZkClient)
> [2019-07-25 18:07:35,724] ERROR Error while creating ephemeral at 
> /brokers/ids/80 with return code: SESSIONEXPIRED 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> [2019-07-25 18:07:35,731] ERROR [KafkaServer id=80] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode 
> = Session expired
> at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
> at kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1725)
> at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1689)
> at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:97)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:260)
> at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
> at kafka.Kafka$.main(Kafka.scala:75)
> at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8738) Cleaning thread blocked when more than one ALTER_REPLICA_LOG_DIRS requests sent

2019-07-30 Thread dingsainan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dingsainan updated KAFKA-8738:
--
Description: 
Hi,
  
 I am experiencing one situation  that the log cleaner dose not work  for the 
related topic-partition when using --kafka-reassign-partitions.sh tool for 
V2.1.1 for more than one time frequently.
  
 My operation:
 submitting one task for migration replica in one same broker first,  when the 
previous task still in progress, we submit one new task for the same 
topic-partition.

 
{code:java}
// the first task:
{"partitions":
[{"topic": "lancer_ops_billions_all_log_json_billions",
  "partition": 1,
  "replicas": [6,15],
  "log_dirs": ["any","/data/mnt/storage02/datum/kafka_data"]}]
}

//the second task
{"partitions":
[{"topic": "lancer_ops_billions_all_log_json_billions",
  "partition": 1,
  "replicas": [6,15],
  "log_dirs": ["any","/data/mnt/storage03/datum/kafka_data"]}]
}
 
{code}
 
 My search:
 Kafka executes abortAndPauseCleaning() once task is submitted, shortly, 
another task is submitted for the same topic-partition, so the clean thread 
status is {color:#ff}LogCleaningPaused(2){color} currently. When the second 
task completed, the clean thread will be resumed for this topic-partition once. 
In my case, the previous task is killed directly, no resumeClean() is executed 
for the first task, so when the second task is completed, the clean status for 
the topic-partition is still {color:#ff}LogCleaningPaused(1){color}, which 
blocks the clean thread for the topic-partition.
  
 _That's all my search, please confirm._
  
 _Thanks_
 _Nora_

  was:
Hi,
 
I am experiencing one situation  that the log cleaner dose not work  for the 
related topic-partition when using --kafka-reassign-partitions.sh tool for 
V2.1.1 for more than one time frequently.
 
My operation:
submitting one task for migration replica in one same broker first,  when the 
previous task still in progress, we submit one new task for the same 
topic-partition.
 
My search:
Kafka executes abortAndPauseCleaning() once task is submitted, shortly, another 
task is submitted for the same topic-partition, so the clean thread status is 
{color:#ff}LogCleaningPaused(2){color} currently. When the second task 
completed, the clean thread will be resumed for this topic-partition once. In 
my case, the previous task is killed directly, no resumeClean() is executed for 
the first task, so when the second task is completed, the clean status for the 
topic-partition is still {color:#ff}LogCleaningPaused(1){color}, which 
blocks the clean thread for the topic-partition.
 
_That's all my search, please confirm._
 
_Thanks_
_Nora_


> Cleaning thread blocked  when more than one ALTER_REPLICA_LOG_DIRS requests 
> sent
> 
>
> Key: KAFKA-8738
> URL: https://issues.apache.org/jira/browse/KAFKA-8738
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.1
>Reporter: dingsainan
>Priority: Major
>
> Hi,
>   
>  I am experiencing one situation  that the log cleaner dose not work  for the 
> related topic-partition when using --kafka-reassign-partitions.sh tool for 
> V2.1.1 for more than one time frequently.
>   
>  My operation:
>  submitting one task for migration replica in one same broker first,  when 
> the previous task still in progress, we submit one new task for the same 
> topic-partition.
>  
> {code:java}
> // the first task:
> {"partitions":
> [{"topic": "lancer_ops_billions_all_log_json_billions",
>   "partition": 1,
>   "replicas": [6,15],
>   "log_dirs": ["any","/data/mnt/storage02/datum/kafka_data"]}]
> }
> //the second task
> {"partitions":
> [{"topic": "lancer_ops_billions_all_log_json_billions",
>   "partition": 1,
>   "replicas": [6,15],
>   "log_dirs": ["any","/data/mnt/storage03/datum/kafka_data"]}]
> }
>  
> {code}
>  
>  My search:
>  Kafka executes abortAndPauseCleaning() once task is submitted, shortly, 
> another task is submitted for the same topic-partition, so the clean thread 
> status is {color:#ff}LogCleaningPaused(2){color} currently. When the 
> second task completed, the clean thread will be resumed for this 
> topic-partition once. In my case, the previous task is killed directly, no 
> resumeClean() is executed for the first task, so when the second task is 
> completed, the clean status for the topic-partition is still 
> {color:#ff}LogCleaningPaused(1){color}, which blocks the clean thread for 
> the topic-partition.
>   
>  _That's all my search, please confirm._
>   
>  _Thanks_
>  _Nora_



--
This message was sent by Atlassian JIRA

[jira] [Created] (KAFKA-8738) Cleaning thread blocked when more than one ALTER_REPLICA_LOG_DIRS requests sent

2019-07-30 Thread dingsainan (JIRA)
dingsainan created KAFKA-8738:
-

 Summary: Cleaning thread blocked  when more than one 
ALTER_REPLICA_LOG_DIRS requests sent
 Key: KAFKA-8738
 URL: https://issues.apache.org/jira/browse/KAFKA-8738
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.1
Reporter: dingsainan


Hi,
 
I am experiencing one situation  that the log cleaner dose not work  for the 
related topic-partition when using --kafka-reassign-partitions.sh tool for 
V2.1.1 for more than one time frequently.
 
My operation:
submitting one task for migration replica in one same broker first,  when the 
previous task still in progress, we submit one new task for the same 
topic-partition.
 
My search:
Kafka executes abortAndPauseCleaning() once task is submitted, shortly, another 
task is submitted for the same topic-partition, so the clean thread status is 
{color:#ff}LogCleaningPaused(2){color} currently. When the second task 
completed, the clean thread will be resumed for this topic-partition once. In 
my case, the previous task is killed directly, no resumeClean() is executed for 
the first task, so when the second task is completed, the clean status for the 
topic-partition is still {color:#ff}LogCleaningPaused(1){color}, which 
blocks the clean thread for the topic-partition.
 
_That's all my search, please confirm._
 
_Thanks_
_Nora_



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8737) TaskMigrated Exception while rebalancing kafka streams

2019-07-30 Thread KUMAR (JIRA)
KUMAR created KAFKA-8737:


 Summary: TaskMigrated Exception while rebalancing kafka streams
 Key: KAFKA-8737
 URL: https://issues.apache.org/jira/browse/KAFKA-8737
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.1, 1.0.0
 Environment: 20 partitions 
1 topic 
8 Streamer service 
topic-region-1 9  7841726 8236017 
394291 
streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/
 
streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer
topic-region-1 15 7421710 7467666 45956 
 
streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/
 
streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer
topic-region-1 19 7737360 8120611 
383251 
streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer-0276e83d-40b5-4b44-b764-7d29e0dab663/

streams-subscriberstopic-region-1-29d615ed-4243-4b9d-90b7-9c517aa0f2e3-StreamThread-1-consumer
topic-region-1
Reporter: KUMAR


Kafka  streams throws following exception while restart of a stream client 
service - 

o.a.k.s.p.internals.StreamThread.? - stream-thread 
[streams-subscriberstopic-region-1-32d968e3-f892-4772-a7a4-6f684d7e43c9-StreamThread-1]
 Detected a task that got migrated to another thread. This implies that this 
thread missed a rebalance and dropped out of the consumer group. Trying to 
rejoin the consumer group now.
org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of 
topic-region-1-12 should not change while restoring: old end offset 6286727, 
current offset 6380997

 

Kafka version is 1.0.0 and we have back merged the fix for KIP-6269-

[https://github.com/apache/kafka/pull/4300/files#|https://github.com/apache/kafka/pull/4300/files]

However we observe that there seems to be an issue in rebalance when 
"auto.offset.reset" is configured as "latest". Based on log analysis we see 
following behavior - 
 # StreamThread starts a restore consumer 
 # While Fetching it gets offset out of range                               
o.a.k.c.consumer.internals.Fetcher.? - [Consumer 
clientId=streams-subscriberstopic-region-1-11b2d7fb-11ce-4b0b-a40a-388d3c7b6bc9-StreamThread-1-restore-
 consumer, groupId=] Fetch READ_UNCOMMITTED at offset 246431 for partition 
topic-region-1-12 returned fetch data (error=OFFSET_OUT_OF_RANGE, 
highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1,
 abortedTransactions = null, recordsSizeInBytes=0) 
 # Fetcher tries to reset the offset 
 # While reset the offset it appears it is changing the offset position and 
causing TaskMigrated exception

Above test repeated with "auto.offset.reset" is configured as "earliest" does 
not throw any TaskMigrated exception as in earliest case we are not reseting 
the restore consumer position.

 

Please let us know if this is possible and if a fix would be needed for the 
offset reset piece when set to latest.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8698) ListOffsets Response protocol documentation

2019-07-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896663#comment-16896663
 ] 

ASF GitHub Bot commented on KAFKA-8698:
---

asutosh936 commented on pull request #7141: KAFKA-8698 : Updated documentation 
to remove typo error.
URL: https://github.com/apache/kafka/pull/7141
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ListOffsets Response protocol documentation
> ---
>
> Key: KAFKA-8698
> URL: https://issues.apache.org/jira/browse/KAFKA-8698
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Fábio Silva
>Assignee: Asutosh Pandya
>Priority: Minor
>  Labels: documentation
>
> The documentation of ListOffsets Response (Version: 0) appears to have an 
> typo on offsets field name, suffixed with `'`.
> {code:java}
> [offsets']{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-8698) ListOffsets Response protocol documentation

2019-07-30 Thread Asutosh Pandya (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asutosh Pandya reassigned KAFKA-8698:
-

Assignee: Asutosh Pandya

> ListOffsets Response protocol documentation
> ---
>
> Key: KAFKA-8698
> URL: https://issues.apache.org/jira/browse/KAFKA-8698
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Fábio Silva
>Assignee: Asutosh Pandya
>Priority: Minor
>  Labels: documentation
>
> The documentation of ListOffsets Response (Version: 0) appears to have an 
> typo on offsets field name, suffixed with `'`.
> {code:java}
> [offsets']{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-07-30 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896629#comment-16896629
 ] 

Sophie Blee-Goldman commented on KAFKA-8736:


Thanks for the ticket! Would you be interested in opening a PR with this fix?

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Priority: Major
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-07-30 Thread Matthew Jarvie (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Jarvie updated KAFKA-8736:
--
Attachment: size.patch

> Performance: ThreadCache uses size() for empty cache check
> --
>
> Key: KAFKA-8736
> URL: https://issues.apache.org/jira/browse/KAFKA-8736
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Matthew Jarvie
>Priority: Major
> Attachments: size.patch
>
>
> While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
> performance improvement. The test showed we were spending 80% of CPU time in 
> ConcurrentSkipListMap.size():
>  
> {noformat}
> 100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
> 100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
> 96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
> 96.84% 
> org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
> 96.83% 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
> 96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
> 96.3% 
> org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
>  java.lang.Object):87
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.3% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.23% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 96.12% 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
>  java.lang.Object):43
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object):133
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
>  java.lang.Object, org.apache.kafka.streams.processor.To):180
> 96.12% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
>  java.lang.Object, java.lang.Object):201
> 96.08% 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
>  java.lang.Object):117
> 82.78% 
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
>  java.lang.Object):169
> 82.78% 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):612
> 82.59% 
> org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):127
> 81.11% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  java.lang.Object):35
> 81.09% 
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
>  byte[]):131
> 81.09% 
> org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
> org.apache.kafka.common.utils.Bytes, 
> org.apache.kafka.streams.state.internals.LRUCacheEntry):151
> 80.53% 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
> 80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
> 80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
> According to 
> [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
>  the size method has to traverse all elements to get a count. It looks like 
> the count is being compared against 0 to determine if the map is empty; In 
> this case, we don't need a full count. Instead, the isEmpty() method should 
> be used, which just looks for one node. We patched this and gained about 25% 
> max throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8736) Performance: ThreadCache uses size() for empty cache check

2019-07-30 Thread Matthew Jarvie (JIRA)
Matthew Jarvie created KAFKA-8736:
-

 Summary: Performance: ThreadCache uses size() for empty cache check
 Key: KAFKA-8736
 URL: https://issues.apache.org/jira/browse/KAFKA-8736
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.3.0
Reporter: Matthew Jarvie
 Attachments: size.patch

While load testing Kafka Streams in 2.3.0, we stumbled across a potential 
performance improvement. The test showed we were spending 80% of CPU time in 
ConcurrentSkipListMap.size():

 
{noformat}
100% org.apache.kafka.streams.processor.internals.StreamThread.run():774
100% org.apache.kafka.streams.processor.internals.StreamThread.runLoop():805
96.84% org.apache.kafka.streams.processor.internals.StreamThread.runOnce():890
96.84% 
org.apache.kafka.streams.processor.internals.TaskManager.process(long):420
96.83% 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(long):199
96.4% org.apache.kafka.streams.processor.internals.StreamTask.process():366
96.3% 
org.apache.kafka.streams.processor.internals.SourceNode.process(java.lang.Object,
 java.lang.Object):87
96.3% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object):133
96.3% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object, org.apache.kafka.streams.processor.To):180
96.3% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
 java.lang.Object, java.lang.Object):201
96.23% 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
 java.lang.Object):117
96.12% 
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(java.lang.Object,
 java.lang.Object):43
96.12% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object):133
96.12% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(java.lang.Object,
 java.lang.Object, org.apache.kafka.streams.processor.To):180
96.12% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(org.apache.kafka.streams.processor.internals.ProcessorNode,
 java.lang.Object, java.lang.Object):201
96.08% 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(java.lang.Object,
 java.lang.Object):117
82.78% 
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(java.lang.Object,
 java.lang.Object):169
82.78% 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$SessionStoreReadWriteDecorator.put(org.apache.kafka.streams.kstream.Windowed,
 java.lang.Object):612
82.59% 
org.apache.kafka.streams.state.internals.MeteredSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
 java.lang.Object):127
81.11% 
org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
 java.lang.Object):35
81.09% 
org.apache.kafka.streams.state.internals.CachingSessionStore.put(org.apache.kafka.streams.kstream.Windowed,
 byte[]):131
81.09% 
org.apache.kafka.streams.state.internals.ThreadCache.put(java.lang.String, 
org.apache.kafka.common.utils.Bytes, 
org.apache.kafka.streams.state.internals.LRUCacheEntry):151
80.53% 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(java.lang.String):238
80.53% org.apache.kafka.streams.state.internals.NamedCache.size():266
80.53% java.util.concurrent.ConcurrentSkipListMap.size():1639{noformat}
According to 
[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#size--],
 the size method has to traverse all elements to get a count. It looks like the 
count is being compared against 0 to determine if the map is empty; In this 
case, we don't need a full count. Instead, the isEmpty() method should be used, 
which just looks for one node. We patched this and gained about 25% max 
throughput, and this method disappeared from thread dumps as a hot spot.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself

2019-07-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896526#comment-16896526
 ] 

ASF GitHub Bot commented on KAFKA-8735:
---

qinghui-xu commented on pull request #7139: KAFKA-8735: Check properties file 
existence first
URL: https://github.com/apache/kafka/pull/7139
 
 
   To make BrokerMetadataCheckpoint more robust, and avoid a leak of 
abstraction.
   
   Details and rationales are in the jira ticket.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BrokerMetadataCheckPoint should check metadata.properties existence itself 
> ---
>
> Key: KAFKA-8735
> URL: https://issues.apache.org/jira/browse/KAFKA-8735
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Qinghui Xu
>Priority: Major
>
> BrokerMetadataCheckPoint tries to read metadata.properties from log directory 
> during server start up. And it relies on org.apache.kafka.common.util.Utils 
> (from org.apache.kafka:kafka-clients) to load the properties file in a given 
> directory.
> During the process, we need to handle the case in which the properties file 
> does not exist (not as an error). Currently, BrokerMetadataCheckPoint relies 
> on the behavior of  `org.apache.kafka.common.util.Utils#loadProps` to find 
> out if the file exists or not: if the properties file is absent, it is 
> expecting NoSuchFileException (for branch 2.1 and above), and it was 
> expecting FileNotFoundException (for branch 2.0 and before). Knowing that 
> `org.apache.kafka.common.util.Utils#loadProps` signature throws only 
> IOException, this exception pattern matching is thus sort of leak of 
> abstraction making BrokerMetadataCheckPoint relies on the implementation 
> details of `org.apache.kafka.common.util.Utils#loadProps`. 
> This makes BrokerMetadataCheckPoint very fragile, especially when 
> `org.apache.kafka.common.util.Utils` and 
> `kafka.server.BrokerMetadataCheckPoint` are from different artifacts, an 
> example that I just ran into:
>  * We have a project A that depends on project B, and project B has a compile 
> time dependency on `org.apache.kafka:kafka-clients`. A is relying 
> `org.apach.kafka:kafka_2.11` in its tests: it will spawn some kafka brokers 
> in the tests.
>  * At first A and B are both using kafka libraries 2.0.1, and everything is 
> working fine
>  * At some point a newer version of B upgrades 
> `org.apache.kafka:kafka-clients` to 2.3.0
>  * When A wants to use the newer version of B, its tests are broken because 
> kafka brokers fail to start: now `org.apache.kafka.common.util.Utils` (2.3.0) 
> throws NoSucheFileException while BrokerMetadataCheckPoint (2.0.1) expects to 
> catch FileNotFoundException
> It would be much more reliable for BrokerMetadataCheckPoint to check the file 
> existence before trying to load the properties from the file.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself

2019-07-30 Thread Qinghui Xu (JIRA)
Qinghui Xu created KAFKA-8735:
-

 Summary: BrokerMetadataCheckPoint should check metadata.properties 
existence itself 
 Key: KAFKA-8735
 URL: https://issues.apache.org/jira/browse/KAFKA-8735
 Project: Kafka
  Issue Type: Improvement
Reporter: Qinghui Xu


BrokerMetadataCheckPoint tries to read metadata.properties from log directory 
during server start up. And it relies on org.apache.kafka.common.util.Utils 
(from org.apache.kafka:kafka-clients) to load the properties file in a given 
directory.

During the process, we need to handle the case in which the properties file 
does not exist (not as an error). Currently, BrokerMetadataCheckPoint relies on 
the behavior of  `org.apache.kafka.common.util.Utils#loadProps` to find out if 
the file exists or not: if the properties file is absent, it is expecting 
NoSuchFileException (for branch 2.1 and above), and it was expecting 
FileNotFoundException (for branch 2.0 and before). Knowing that 
`org.apache.kafka.common.util.Utils#loadProps` signature throws only 
IOException, this exception pattern matching is thus sort of leak of 
abstraction making BrokerMetadataCheckPoint relies on the implementation 
details of `org.apache.kafka.common.util.Utils#loadProps`. 

This makes BrokerMetadataCheckPoint very fragile, especially when 
`org.apache.kafka.common.util.Utils` and 
`kafka.server.BrokerMetadataCheckPoint` are from different artifacts, an 
example that I just ran into:
 * We have a project A that depends on project B, and project B has a compile 
time dependency on `org.apache.kafka:kafka-clients`. A is relying 
`org.apach.kafka:kafka_2.11` in its tests: it will spawn some kafka brokers in 
the tests.
 * At first A and B are both using kafka libraries 2.0.1, and everything is 
working fine
 * At some point a newer version of B upgrades `org.apache.kafka:kafka-clients` 
to 2.3.0
 * When A wants to use the newer version of B, its tests are broken because 
kafka brokers fail to start: now `org.apache.kafka.common.util.Utils` (2.3.0) 
throws NoSucheFileException while BrokerMetadataCheckPoint (2.0.1) expects to 
catch FileNotFoundException

It would be much more reliable for BrokerMetadataCheckPoint to check the file 
existence before trying to load the properties from the file.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-07-30 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896500#comment-16896500
 ] 

Matthias J. Sax commented on KAFKA-8705:


[~bbejeck] Is 2.3 the only affected version?

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> 

[jira] [Created] (KAFKA-8734) Remove PartitionAssignorAdapter and deprecated PartitionAssignor interface

2019-07-30 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8734:
--

 Summary: Remove PartitionAssignorAdapter and deprecated 
PartitionAssignor interface
 Key: KAFKA-8734
 URL: https://issues.apache.org/jira/browse/KAFKA-8734
 Project: Kafka
  Issue Type: Task
  Components: clients
Affects Versions: 3.0.0
Reporter: Sophie Blee-Goldman


In 2.4 we deprecated the consumer.internal.PartitionAssignor interface and 
migrated all assignors to the [new public consumer.ConsumerPartitionAssignor 
interface|[https://github.com/apache/kafka/pull/7108|https://github.com/apache/kafka/pull/7108/files]].
 Although internal, we provided an [adapter 
|[https://github.com/apache/kafka/pull/7110]]for those who may have implemented 
a custom PartitionAssignor to avoid breaking changes. These should be removed 
in the next major release.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup

2019-07-30 Thread Bruno Cadonna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896370#comment-16896370
 ] 

Bruno Cadonna commented on KAFKA-7937:
--

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6601/testReport/

> Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
> 
>
> Key: KAFKA-7937
> URL: https://issues.apache.org/jira/browse/KAFKA-7937
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.2.0, 2.1.1, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Critical
> Fix For: 2.4.0
>
> Attachments: log-job6122.txt
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline
> {quote}kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsNotExistingGroup FAILED 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available. at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
>  Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: 
> The coordinator is not available.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic

2019-07-30 Thread Bruno Cadonna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896369#comment-16896369
 ] 

Bruno Cadonna commented on KAFKA-8589:
--

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6601/

> Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
> --
>
> Key: KAFKA-8589
> URL: https://issues.apache.org/jira/browse/KAFKA-8589
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, clients, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull]
> *20:25:15* 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15*
>  *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > 
> testResetOffsetsExistingTopic FAILED*20:25:15* 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.*20:25:15* at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15*
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15*
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15*
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15*
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15*
>  at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15*
>  at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15*
>  at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15*   
>   at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* 
> at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15*  
>at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15*  
>at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15*   
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15*   
>   at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15*
>  at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15*
>  at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15*
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15*
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15*
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15*
>  at 
> kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15*
>  *20:25:15* Caused by:*20:25:15* 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.*20*



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8686) Flakey test ExampleConnectIntegrationTest#testSinkConnector

2019-07-30 Thread Bruno Cadonna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896366#comment-16896366
 ] 

Bruno Cadonna commented on KAFKA-8686:
--

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6601/testReport/

> Flakey test ExampleConnectIntegrationTest#testSinkConnector
> ---
>
> Key: KAFKA-8686
> URL: https://issues.apache.org/jira/browse/KAFKA-8686
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect, unit tests
>Affects Versions: 2.4.0
>Reporter: Boyang Chen
>Priority: Major
>  Labels: flaky-test
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/429/console
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector.test.stdout*20:09:20*
>  *20:09:20* 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > 
> testSinkConnector FAILED*20:09:20* java.lang.AssertionError: Condition 
> not met within timeout 15000. Connector tasks were not assigned a partition 
> each.*20:09:20* at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)*20:09:20*
>  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353)*20:09:20*
>  at 
> org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSinkConnector(ExampleConnectIntegrationTest.java:128)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-07-30 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8733:
--
Description: 
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}
Attached the graphs of disk weighted io time stats when this issue occurred.

I will raise a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
 describing options on how to handle this scenario.

 

  was:
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}
I will raise a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
 describing options on how to handle this scenario.

 


> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out 

[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-07-30 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8733:
--
Attachment: wio-time.png
weighted-io-time-2.png

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is disabled.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from sir
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> I will raise a 
> [KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
>  describing options on how to handle this scenario.
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Updated] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-07-30 Thread Satish Duggana (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana updated KAFKA-8733:
--
Description: 
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}
I will raise a 
[KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-501+Avoid+offline+partitions+in+the+edgcase+scenario+of+follower+fetch+requests+not+processed+in+time]
 describing options on how to handle this scenario.

 

  was:
We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}

I will raise a KIP describing options on how to handle this scenario.

 


> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is disabled.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = 

[jira] [Created] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2019-07-30 Thread Satish Duggana (JIRA)
Satish Duggana created KAFKA-8733:
-

 Summary: Offline partitions occur when leader's disk is slow in 
reads while responding to follower fetch requests.
 Key: KAFKA-8733
 URL: https://issues.apache.org/jira/browse/KAFKA-8733
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.2, 2.4.0
Reporter: Satish Duggana
Assignee: Satish Duggana


We found offline partitions issue multiple times on some of the hosts in our 
clusters. After going through the broker logs and hosts’s disk stats, it looks 
like this issue occurs whenever the read/write operations take more time on 
that disk. In a particular case where read time is more than the 
replica.lag.time.max.ms, follower replicas will be out of sync as their earlier 
fetch requests are stuck while reading the local log and their fetch status is 
not yet updated as mentioned in the below code of `ReplicaManager`. If there is 
an issue in reading the data from the log for a duration more than 
replica.lag.time.max.ms then all the replicas will be out of sync and partition 
becomes offline if min.isr.replicas > 1 and unclean.leader.election is disabled.

 
{code:java}
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog( // this call took more than 
`replica.lag.time.max.ms`
  replicaId = replicaId,
  fetchOnlyFromLeader = fetchOnlyFromLeader,
  readOnlyCommitted = fetchOnlyCommitted,
  fetchMaxBytes = fetchMaxBytes,
  hardMaxBytesLimit = hardMaxBytesLimit,
  readPartitionInfo = fetchInfos,
  quota = quota,
  isolationLevel = isolationLevel)
  if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // fetch 
time gets updated here, but mayBeShrinkIsr should have been already called and 
the replica is removed from sir
 else result
 }

val logReadResults = readFromLog()
{code}

I will raise a KIP describing options on how to handle this scenario.

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8717) Use cached hw/lso offset metadata when reading from log

2019-07-30 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8717.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Use cached hw/lso offset metadata when reading from log
> ---
>
> Key: KAFKA-8717
> URL: https://issues.apache.org/jira/browse/KAFKA-8717
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.0
>
>
> The broker caches log offset metadata (e.g. segment position) for the high 
> watermark and last stable offset in order to avoid additional index lookups 
> when handling fetches. Currently this metadata is only used when determining 
> delayed fetch satisfaction. We can also use it when reading from the log in 
> order to avoid additional redundant index lookups.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8717) Use cached hw/lso offset metadata when reading from log

2019-07-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896251#comment-16896251
 ] 

ASF GitHub Bot commented on KAFKA-8717:
---

hachikuji commented on pull request #7081: KAFKA-8717: Reuse cached offset 
metadata when reading from log
URL: https://github.com/apache/kafka/pull/7081
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use cached hw/lso offset metadata when reading from log
> ---
>
> Key: KAFKA-8717
> URL: https://issues.apache.org/jira/browse/KAFKA-8717
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> The broker caches log offset metadata (e.g. segment position) for the high 
> watermark and last stable offset in order to avoid additional index lookups 
> when handling fetches. Currently this metadata is only used when determining 
> delayed fetch satisfaction. We can also use it when reading from the log in 
> order to avoid additional redundant index lookups.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server

2019-07-30 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8442?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8442.

   Resolution: Fixed
Fix Version/s: 2.4.0

> Inconsistent ISR output in topic command when using --bootstrap-server
> --
>
> Key: KAFKA-8442
> URL: https://issues.apache.org/jira/browse/KAFKA-8442
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
> Fix For: 2.4.0
>
>
> If there is no leader for a partition, the Metadata API returns an empty ISR. 
> When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads 
> to the following output:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 
> Configs:segment.bytes=1073741824
> Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
> {code}
> When using `--zookeeper`, we display the current ISR correctly:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
> Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
> {code}
> To avoid confusion, we should make this output consistent or at least not 
> misleading. We should either change the Metadata API to print the ISR when we 
> have it or we can change the output of the topic command to `N/A` or 
> something like that.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8442) Inconsistent ISR output in topic command when using --bootstrap-server

2019-07-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896248#comment-16896248
 ] 

ASF GitHub Bot commented on KAFKA-8442:
---

hachikuji commented on pull request #6836: KAFKA-8442:Inconsistent ISR output 
in topic command
URL: https://github.com/apache/kafka/pull/6836
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Inconsistent ISR output in topic command when using --bootstrap-server
> --
>
> Key: KAFKA-8442
> URL: https://issues.apache.org/jira/browse/KAFKA-8442
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
>
> If there is no leader for a partition, the Metadata API returns an empty ISR. 
> When using the `--bootstrap-server` option with `kafka-topics.sh`, this leads 
> to the following output:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 
> Configs:segment.bytes=1073741824
> Topic: foo  Partition: 0Leader: noneReplicas: 1,3   Isr: 
> {code}
> When using `--zookeeper`, we display the current ISR correctly:
> {code}
> Topic:foo   PartitionCount:1ReplicationFactor:2 Configs:
> Topic: foo  Partition: 0Leader: -1  Replicas: 1,3   Isr: 1
> {code}
> To avoid confusion, we should make this output consistent or at least not 
> misleading. We should either change the Metadata API to print the ISR when we 
> have it or we can change the output of the topic command to `N/A` or 
> something like that.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol

2019-07-30 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8640.

Resolution: Fixed

> Replace OffsetFetch request/response with automated protocol
> 
>
> Key: KAFKA-8640
> URL: https://issues.apache.org/jira/browse/KAFKA-8640
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8640) Replace OffsetFetch request/response with automated protocol

2019-07-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896230#comment-16896230
 ] 

ASF GitHub Bot commented on KAFKA-8640:
---

hachikuji commented on pull request #7062: KAFKA-8640: Replace OffsetFetch 
request with automated protocol
URL: https://github.com/apache/kafka/pull/7062
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace OffsetFetch request/response with automated protocol
> 
>
> Key: KAFKA-8640
> URL: https://issues.apache.org/jira/browse/KAFKA-8640
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8711) Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. testControlPlaneRequest

2019-07-30 Thread Chandrasekhar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896180#comment-16896180
 ] 

Chandrasekhar commented on KAFKA-8711:
--

Any feedback?

> Kafka 2.3.0 Transient Unit Test Failures SocketServerTest. 
> testControlPlaneRequest
> --
>
> Key: KAFKA-8711
> URL: https://issues.apache.org/jira/browse/KAFKA-8711
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Chandrasekhar
>Priority: Critical
> Attachments: KafkaAUTFailures07242019_PASS2.txt, 
> KafkaUTFailures07242019_PASS2.GIF
>
>
> Cloned Kakfa 2.3.0 source to our git repo and compiled it using 'gradle 
> build', we see the following error consistently:
> Gradle Version 4.7
>  
> testControlPlaneRequest
> java.net.BindException: Address already in use (Bind failed)
>     at java.net.PlainSocketImpl.socketBind(Native Method)
>     at 
> java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:387)
>     at java.net.Socket.bind(Socket.java:644)
>     at java.net.Socket.(Socket.java:433)
>     at java.net.Socket.(Socket.java:286)
>     at kafka.network.SocketServerTest.connect(SocketServerTest.scala:140)
>     at 
> kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1(SocketServerTest.scala:200)
>     at 
> kafka.network.SocketServerTest.$anonfun$testControlPlaneRequest$1$adapted(SocketServerTest.scala:199)
>     at 
> kafka.network.SocketServerTest.withTestableServer(SocketServerTest.scala:1141)
>     at 
> kafka.network.SocketServerTest.testControlPlaneRequest(SocketServerTest.scala:199)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>     at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>     at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>     at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>     at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>     at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>     at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>     at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>     at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>     at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>     at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>     at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>     at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>     at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>     at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>     at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>     at 
> 

[jira] [Created] (KAFKA-8732) specifying a non-existent broker to ./bin/kafka-reassign-partitions.sh leads to reassignment never getting completed.

2019-07-30 Thread Raunak (JIRA)
Raunak created KAFKA-8732:
-

 Summary: specifying a non-existent broker to 
./bin/kafka-reassign-partitions.sh leads to reassignment never getting 
completed.
 Key: KAFKA-8732
 URL: https://issues.apache.org/jira/browse/KAFKA-8732
 Project: Kafka
  Issue Type: Bug
  Components: controller, tools
Affects Versions: 0.10.1.1
 Environment: Ubuntu-VERSION="14.04.5 LTS"
Reporter: Raunak


Specifying a non-existent broker to ./bin/kafka-reassign-partitions.sh leads to 
reassignment never getting completed. 

 My reassignment is getting struck if I provide non-existing broker ID. My 
kafka version is 0.10.1.1.

 

 
{code:java}
./kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file 
le.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"cv-topic","partition":0,"replicas":[1011131,101067,98,101240]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
{code}
In this 98 is the non-existing broker. Deleting reassign_partitions znode is of 
no use as well. As when I describe the same topic the 98 broker is out of sync.

 

 
{code:java}
Topic:cv-topic PartitionCount:1 ReplicationFactor:4 Configs:
Topic: cv-topic Partition: 0 Leader: 1011131 Replicas: 1011131,101067,98,101240 
Isr: 1011131,101067,101240

{code}
Now 98 will always be out of sync.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-821) specifying a non-existent partition to ./bin/kafka-reassign-partitions.sh breaks all reassignment ops

2019-07-30 Thread Raunak (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16896003#comment-16896003
 ] 

Raunak commented on KAFKA-821:
--

[~nehanarkhede]

I don't think this is working. My reassignment is getting struck if I provide 
non-existing broker ID. My kafka version is 0.10.1.1.

 

 
{code:java}
./kafka-reassign-partitions.sh --zookeeper zk:2181 --reassignment-json-file 
le.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"cv-topic","partition":0,"replicas":[1011131,101067,98,101240]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
{code}
In this 98 is the non-existing broker. Deleting reassign_partitions znode is of 
no use as well. As when I describe the same topic the 98 broker is out of sync.

 

 
{code:java}
Topic:cv-topic PartitionCount:1 ReplicationFactor:4 Configs:
Topic: cv-topic Partition: 0 Leader: 1011131 Replicas: 1011131,101067,98,101240 
Isr: 1011131,101067,101240

{code}
Now 98 will always be out of sync.

 

> specifying a non-existent partition to ./bin/kafka-reassign-partitions.sh 
> breaks all reassignment ops
> -
>
> Key: KAFKA-821
> URL: https://issues.apache.org/jira/browse/KAFKA-821
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, tools
>Affects Versions: 0.8.0
>Reporter: Scott Clasen
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: kafka-0.8, p1
>
> From my ML post...the workaround is to manually delete 
> /admin/reassign_partitions from ZK, that node contained a json with an empty 
> partitions array.
> Have 3 brokers running. Ids 25,26,27
> ./bin/kafka-create-topic.sh --replica 3 --topic first-cluster-topic
> --zookeeper :2181/kafka
> Seems fine, can send/receive, etc..
> Kill 27, start 28. Try to reassign the single partition topic with the
> following json.
> Contains an error. partition should be 0 not 1.
>  {"partitions":
>  [{"topic": "first-cluster-topic", "partition": 1, "replicas": [25,26,28] }]
> }
> ./bin/kafka-reassign-partitions.sh  --zookeeper ... -path-to-json-file
> reassign.json
> 2013-03-21 12:14:46,170] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2013-03-21 12:14:46,310] ERROR Skipping reassignment of partition
> [first-cluster-topic,1] since it doesn't exist
> (kafka.admin.ReassignPartitionsCommand)
> Successfully started reassignment of partitions Map([first-cluster-topic,1]
> -> List(25, 26, 28))
> [2013-03-21 12:14:46,665] INFO Terminate ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> [2013-03-21 12:14:46,780] INFO Session: 0x13d8a63a3760007 closed
> (org.apache.zookeeper.ZooKeeper)
> [2013-03-21 12:14:46,780] INFO EventThread shut down
> (org.apache.zookeeper.ClientCnxn)
> Ok, fix the JSON
>  {"partitions":
>  [{"topic": "first-cluster-topic", "partition": 0, "replicas": [25,26,28] }]
> }
> ./bin/kafka-reassign-partitions.sh  --zookeeper ... -path-to-json-file
> reassign.json
> [2013-03-21 12:17:34,367] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> Partitions reassignment failed due to Partition reassignment currently in
> progress for Map(). Aborting operation
> kafka.common.AdminCommandFailedException: Partition reassignment currently
> in progress for Map(). Aborting operation
> at
> kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:91)
> at
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:65)
> at
> kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
>  ./bin/kafka-check-reassignment-status.sh --zookeeper ...
> --path-to-json-file reassign.json
> [2013-03-21 12:20:40,607] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> Exception in thread "main" java.lang.ClassCastException:
> scala.collection.immutable.Map$Map1 cannot be cast to
> [Lscala.collection.Map;
> at
> kafka.admin.CheckReassignmentStatus$.main(CheckReassignmentStatus.scala:44)
> at kafka.admin.CheckReassignmentStatus.main(CheckReassignmentStatus.scala)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-2758) Improve Offset Commit Behavior

2019-07-30 Thread Omkar Mestry (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895953#comment-16895953
 ] 

Omkar Mestry commented on KAFKA-2758:
-

Is this issue still open and if open can I assign it to myself?

> Improve Offset Commit Behavior
> --
>
> Key: KAFKA-2758
> URL: https://issues.apache.org/jira/browse/KAFKA-2758
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie, reliability
>
> There are two scenarios of offset committing that we can improve:
> 1) we can filter the partitions whose committed offset is equal to the 
> consumed offset, meaning there is no new consumed messages from this 
> partition and hence we do not need to include this partition in the commit 
> request.
> 2) we can make a commit request right after resetting to a fetch / consume 
> position either according to the reset policy (e.g. on consumer starting up, 
> or handling of out of range offset, etc), or through the {code} seek {code} 
> so that if the consumer fails right after these event, upon recovery it can 
> restarts from the reset position instead of resetting again: this can lead 
> to, for example, data loss if we use "largest" as reset policy while there 
> are new messages coming to the fetching partitions.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2019-07-30 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895901#comment-16895901
 ] 

Sönke Liebau commented on KAFKA-1566:
-

Is this still something we are interested in doing?  I just closed KAFKA-1234 
as a duplicate of this, because it had no activity for even longer than this 
ticket.

Personally I think this is not really necessary, as most deployments will run 
from service runner tools like Upstart or Systemd which provide methods for 
setting environment variables. At the same time, it also won't hurt and might 
be useful for some people, so there is no real reason not to do it.

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>Priority: Major
>  Labels: newbie, windows
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-1111) Broker prematurely accepts TopicMetadataRequests on startup

2019-07-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-.
-
Resolution: Abandoned

Closing as abandoned after no objections on dev list. If this is indeed still 
an issue we can always reopen this.

> Broker prematurely accepts TopicMetadataRequests on startup
> ---
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Rosenberg
>Assignee: Neha Narkhede
>Priority: Major
>
> I have an issue where on startup, the broker starts accepting 
> TopicMetadataRequests before it has had metadata sync'd from the controller.  
> This results in a bunch of log entries that look like this:
> 013-11-01 03:26:01,577  INFO [kafka-request-handler-0] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 9, 10 ] }, "version":1 }
> 2013-11-01 03:26:07,767  INFO [kafka-request-handler-1] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 9, 11 ] }, "version":1 }
> 2013-11-01 03:26:07,823  INFO [kafka-request-handler-1] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 }
> 2013-11-01 03:26:11,183  INFO [kafka-request-handler-2] admin.AdminUtils$ - 
> Topic creation { "partitions":{ "0":[ 10, 11 ] }, "version":1 }
> From an email thread, Neha remarks:
> Before a broker receives the first
> LeaderAndIsrRequest/UpdateMetadataRequest, it is technically not ready to
> start serving any request. But it still ends up serving
> TopicMetadataRequest which can re-create topics accidentally. It shouldn't
> succeed, but this is still a problem.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-1099) StopReplicaRequest and StopReplicaResponse should also carry the replica ids

2019-07-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-1099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-1099.
-
Resolution: Abandoned

Closing this as abandoned after asking for feedback on the dev list and 
receiving no objections.

> StopReplicaRequest and StopReplicaResponse should also carry the replica ids
> 
>
> Key: KAFKA-1099
> URL: https://issues.apache.org/jira/browse/KAFKA-1099
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.1
>Reporter: Neha Narkhede
>Assignee: Neha Narkhede
>Priority: Major
>
> The stop replica request and response only contain a list of partitions for 
> which a replica should be moved to offline/nonexistent state. But the replica 
> id information is implicit in the network layer as the receiving broker. This 
> complicates stop replica response handling on the controller. This blocks the 
> right fix for KAFKA-1097 since it requires invoking callback for processing a 
> StopReplicaResponse and it requires to know the replica id from the 
> StopReplicaResponse.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-1016) Broker should limit purgatory size

2019-07-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-1016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-1016.
-
Resolution: Not A Problem

Closing this as "not a problem", I believe the Purgatory redesign should help 
with the issue described here to a large extent.


> Broker should limit purgatory size
> --
>
> Key: KAFKA-1016
> URL: https://issues.apache.org/jira/browse/KAFKA-1016
> Project: Kafka
>  Issue Type: Bug
>  Components: purgatory
>Affects Versions: 0.8.0
>Reporter: Chris Riccomini
>Assignee: Joel Koshy
>Priority: Major
>
> I recently ran into a case where a poorly configured Kafka consumer was able 
> to trigger out of memory exceptions in multiple Kafka brokers. The consumer 
> was configured to have a fetcher.max.wait of Int.MaxInt.
> For low volume topics, this configuration causes the consumer to block for 
> frequently, and for long periods of time. [~junrao] informs me that the fetch 
> request will time out after the socket timeout is reached. In our case, this 
> was set to 30s.
> With several thousand consumer threads, the fetch request purgatory got into 
> the 100,000-400,000 range, which we believe triggered the out of memory 
> exception. [~nehanarkhede] claims to have seem similar behavior in other high 
> volume clusters.
> It kind of seems like a bad thing that a poorly configured consumer can 
> trigger out of memory exceptions in the broker. I was thinking maybe it makes 
> sense to have the broker try and protect itself from this situation. Here are 
> some potential solutions:
> 1. Have a broker-side max wait config for fetch requests.
> 2. Threshold the purgatory size, and either drop the oldest connections in 
> purgatory, or reject the newest fetch requests when purgatory is full.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-822) Reassignment of partitions needs a cleanup

2019-07-30 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-822.

Resolution: Abandoned

Closing this as abandoned after asking for feedback on the dev list. Its 
probably also fixed, but not absolutely sure of that.

> Reassignment of partitions needs a cleanup
> --
>
> Key: KAFKA-822
> URL: https://issues.apache.org/jira/browse/KAFKA-822
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, tools
>Affects Versions: 0.8.0
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>Priority: Major
>  Labels: bugs
>
> 1. This is probably a left-over from when the ReassignPartitionsCommand used 
> to be blocking: 
> Currently, for each partition that is reassigned, controller deletes the 
> /admin/reassign_partitions zk path, and populates it with a new list with the 
> reassigned partition removed from the original list. This is probably an 
> overkill, and we can delete the zk path completely once the reassignment of 
> all partitions has completed successfully or in error. 
> 2. It will help to clarify that there could be no replicas that have started 
> and are not in the ISR when KafkaController.onPartitionReassignment() is 
> called.
> 3. We should batch the requests in 
> KafkaController.StopOldReplicasOfReassignedPartition()
> 4. Update controllerContext.partitionReplicaAssignment only once in 
> KafkaController.updateAssignedReplicasForPartition().
> 5. Need to thoroughly test.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7376) After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers on SASL_SSL

2019-07-30 Thread Aldan Brito (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16895844#comment-16895844
 ] 

Aldan Brito commented on KAFKA-7376:


Hi [~ijuma]

Even we are facing a similar issue w.r.t to SSL hostname verifications.

Scenario : 
we have two KAfKA listeners internal and external.
Internal listener is mapped to the FQDN of the Broker.
   eg: internal://FQDN:9092
External listener is mapped to user defined name.
   eg: external://testkafka:8109

while generating the SSL certificates, we have used CN name as the FQDN of the 
broker, 
and both the listener names are included in the SAN entries.

when client does a handhshake with the external listener ie. broker-list config 
of producer set to external://testkafka:8109, we get below exceptions.
{code:java}
Caused by: java.security.cert.CertificateException: No name matching testkafka 
found
at sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:231)
at sun.security.util.HostnameChecker.match(HostnameChecker.java:96)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
at 
sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:436)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:252)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1626)
{code}
and if we disable ssl.endpoint.algorithm for the external listener the 
handshake goes through fine.

if we have internal and external listeners with the FQDN and generate the 
certificates
CN as the FQDN
for eg : 
  internal://FQDN:9092
  external://FQDN:8109

client does a request with broker-list config of producer set to 
external://FQDN:8109 works fine

looks like the broker-list DNS domain name is verified against the CN name and 
does not consider SAN entries.

decrypted server keystore snapshot:
{code:java}
Signature algorithm name: SHA256withRSA
Subject Public Key Algorithm: 2048-bit RSA key
Version: 3

Extensions:

#1: ObjectId: 2.5.29.19 Criticality=false
BasicConstraints:[
CA:true
PathLen:2147483647
]

#2: ObjectId: 2.5.29.37 Criticality=false
ExtendedKeyUsages [
serverAuth
clientAuth
]

#3: ObjectId: 2.5.29.15 Criticality=false
KeyUsage [
DigitalSignature
Non_repudiation
Key_Encipherment
]

#4: ObjectId: 2.5.29.17 Criticality=false
SubjectAlternativeName [
DNSName: kf-mykaf-0.kf-mykaf-headless.default.svc.cluster.local
DNSName: testkafka
]

{code}

> After Kafka upgrade to v2.0.0 , Controller unable to communicate with brokers 
> on SASL_SSL
> -
>
> Key: KAFKA-7376
> URL: https://issues.apache.org/jira/browse/KAFKA-7376
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sridhar
>Priority: Major
>
> Hi ,
> We upgraded our Kafka cluster (3x nodes running on AWS cloud) to 2.0.0 
> version and enabled security with SASL_SSL (plain) encryption for 
> Inter-broker and Client connection . 
> But there are lot of errors in the controller log for the inter-broker 
> communication .I have the followed exactly same steps as mentioned in the 
> document and set all kafka brokers fqdn hostname in the SAN 
> (SubjectAlternativeName) of my server certificate (selfsigned) .
> [http://kafka.apache.org/documentation.html#security|http://example.com/]
>  
> openssl s_client -connect kafka-3:9093
>  CONNECTED(0003)
>  depth=1
> Noticed someone else also facing the similar problem .
> [https://github.com/confluentinc/common/issues/158]
>  
>  
> {noformat}
> Server Configuration : 
> listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
> advertised.listeners=PLAINTEXT://kafka-3:9092,SASL_SSL://kafka-3:9093
> #Security
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
> allow.everyone.if.no.acl.found=false
> security.inter.broker.protocol=SASL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> sasl.enabled.mechanisms=PLAIN
> super.users=User:admin
> ssl.client.auth=required
> ssl.endpoint.identification.algorithm=
> ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
> ssl.truststore.password=
> ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
> ssl.keystore.password=
> ssl.key.password=
> #Zookeeper
> zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181
> zookeeper.connection.timeout.ms=6000
> {noformat}
>  
>  
> {code:java}
>  
> [2018-09-04 12:02:57,289] WARN [RequestSendThread controllerId=2] Controller 
> 2's connection to broker kafka-3:9093 (id: 3 rack: eu-central-1c) was 
> unsuccessful (kafka.controller.RequestSendThread)
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
> Caused by: