[GitHub] [kafka] xiao-penglei commented on pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-27 Thread GitBox


xiao-penglei commented on pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#issuecomment-768859211


   > @xiao-penglei : Thanks for the PR. Did the "Arm Build" get triggered in 
jenkins? I can't seem to find it.
   
   @junrao When you create a Jenkins pipline, there are two configuration 
methods of Jenkinsfile source. One is to use the Jenkinsfile of repository, and 
the other way is to use the static Jenkinsfile. I do not know the configuration 
of the jenkis pipline of Kafka because I do not have permission. The "Arm 
build" can be found only after the Jenkinsfile used in the jenkins pipline is 
modified.  The Jenkinsfile modification that I submitted was tested locally. 
Now we need modify the Jenkinsfile used in the jenkins pipline.  Maybe @ijuma 
can help us. Thank you. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273379#comment-17273379
 ] 

Boyang Chen commented on KAFKA-9689:


I agree with A) since it is internal struct.

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xiao-penglei commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-27 Thread GitBox


xiao-penglei commented on a change in pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#discussion_r565854978



##
File path: Jenkinsfile
##
@@ -160,5 +160,23 @@ pipeline {
 }
   }
 }
+stage("Arm Build") {
+  agent { label 'arm4' }
+  options {
+timeout(time: 8, unit: 'HOURS')
+timestamps()
+  }
+  environment {

Review comment:
   @junrao Yeah, I have installed the openjdk-8u252-b09 that the version is 
same as jdk_1.8_latest of apache infra. The reason is that I could not find 
appropriate version of jdk that can run on arm platform from the jenkins 
software of apache infra. [infra jdk 
matrix](https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix)
 . I have also installed the maven-3.6.3 that version is same as maven_3_latest 
of apache infra. I will take time to ensure the versions of jdk and maven are 
consistent with those of apache infra. In most cases, the arm4 node is just for 
kafka arm ci. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xiao-penglei commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-27 Thread GitBox


xiao-penglei commented on a change in pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#discussion_r565854978



##
File path: Jenkinsfile
##
@@ -160,5 +160,23 @@ pipeline {
 }
   }
 }
+stage("Arm Build") {
+  agent { label 'arm4' }
+  options {
+timeout(time: 8, unit: 'HOURS')
+timestamps()
+  }
+  environment {

Review comment:
   Yeah, I have installed the openjdk-8u252-b09 that the version is same as 
jdk_1.8_latest of apache infra. The reason is that I could not find appropriate 
version of jdk that can run on arm platform from the jenkins software of apache 
infra. [infra jdk 
matrix](https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix)
 . I have also installed the maven-3.6.3 that version is same as maven_3_latest 
of apache infra. I will take time to ensure the versions of jdk and maven are 
consistent with those of apache infra. In most cases, the arm4 node is just for 
kafka arm ci. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed

2021-01-27 Thread GitBox


showuon commented on pull request #9791:
URL: https://github.com/apache/kafka/pull/9791#issuecomment-768830962


   @kkonstantine , Thanks for the comments. So you mean this issue can be kept 
as is? Or we should find another way to implement it instead of bookkeeping way?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10658) ErrantRecordReporter.report always return completed future even though the record is not sent to DLQ topic yet

2021-01-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10658.

Fix Version/s: 2.6.2
   2.7.1
   2.8.0
   Resolution: Fixed

> ErrantRecordReporter.report always return completed future even though the 
> record is not sent to DLQ topic yet 
> ---
>
> Key: KAFKA-10658
> URL: https://issues.apache.org/jira/browse/KAFKA-10658
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> This issue happens when both DLQ and error log are enabled. There is a 
> incorrect filter in handling multiple reports and it results in the 
> uncompleted future is filtered out. Hence, users always receive a completed 
> future even though the record is still in producer buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9525: KAFKA-10658 ErrantRecordReporter.report always return completed futur…

2021-01-27 Thread GitBox


chia7712 commented on pull request #9525:
URL: https://github.com/apache/kafka/pull/9525#issuecomment-768828561


   merge to trunk, 2.7 and 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9525: KAFKA-10658 ErrantRecordReporter.report always return completed futur…

2021-01-27 Thread GitBox


chia7712 merged pull request #9525:
URL: https://github.com/apache/kafka/pull/9525


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] g1geordie commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-27 Thread GitBox


g1geordie commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r565834861



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##
@@ -1004,10 +998,6 @@ public void testWithRecords(Args args) {
 }

Review comment:
   It sound like you change `assume (condition)` to  `if (condition) ... 
else ...`  in all method





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10658) ErrantRecordReporter.report always return completed future even though the record is not sent to DLQ topic yet

2021-01-27 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10658:
---
Affects Version/s: 2.6.0

> ErrantRecordReporter.report always return completed future even though the 
> record is not sent to DLQ topic yet 
> ---
>
> Key: KAFKA-10658
> URL: https://issues.apache.org/jira/browse/KAFKA-10658
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> This issue happens when both DLQ and error log are enabled. There is a 
> incorrect filter in handling multiple reports and it results in the 
> uncompleted future is filtered out. Hence, users always receive a completed 
> future even though the record is still in producer buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol

2021-01-27 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273328#comment-17273328
 ] 

Boyang Chen commented on KAFKA-12169:
-

In general, the leader should be able to detect metadata discrepancy between 
its remembered topic metadata and broker side metadata. I don't think we have 
any test case to cover both the topic partition change and leader rejoin at the 
same time, so it's possible and needs some verification. 

> Consumer can not know paritions chage when client leader restart with static 
> membership protocol
> 
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-27 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-  leaderEpoch = epoch,
-  origin = AppendOrigin.Coordinator)
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+handleAndConvertLogAppendInfo(
+  log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+leaderEpoch = epoch,
+origin = AppendOrigin.Coordinator
+  )
+)
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+appendInfo.firstOffset match {
+  case Some(firstOffset) =>
+if (firstOffset.relativePositionInSegment == 0) {
+  // Assume that a new segment was created if the relative position is 0
+  log.deleteOldSegments()
+}
+new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+  case None =>
+throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+}
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+  // Return the epoch of the snapshot when the log is empty
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+s"and the log start offset ($startOffset)"
+  )
+}
+  }.orElse(0)
+}
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
 val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-  new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+  if (oldestSnapshotId.isPresent() &&
+offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
   First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that sending a snapshot is not required. 
When thinking through 

[GitHub] [kafka] chia7712 commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r565826783



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##
@@ -1004,10 +998,6 @@ public void testWithRecords(Args args) {
 }

Review comment:
   @g1geordie I file a patch for aforementioned idea. Please take a look at 
https://github.com/chia7712/kafka/pull/1/files
   
   it uses explicit assert (exception or expected value) for all parameters 
instead of just ignoring them
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-27 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-  leaderEpoch = epoch,
-  origin = AppendOrigin.Coordinator)
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+handleAndConvertLogAppendInfo(
+  log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+leaderEpoch = epoch,
+origin = AppendOrigin.Coordinator
+  )
+)
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+appendInfo.firstOffset match {
+  case Some(firstOffset) =>
+if (firstOffset.relativePositionInSegment == 0) {
+  // Assume that a new segment was created if the relative position is 0
+  log.deleteOldSegments()
+}
+new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+  case None =>
+throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+}
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+  // Return the epoch of the snapshot when the log is empty
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+s"and the log start offset ($startOffset)"
+  )
+}
+  }.orElse(0)
+}
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
 val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-  new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+  if (oldestSnapshotId.isPresent() &&
+offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
   First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that sending a snapshot is not required. 
When thinking through 

[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-27 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-  leaderEpoch = epoch,
-  origin = AppendOrigin.Coordinator)
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+handleAndConvertLogAppendInfo(
+  log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+leaderEpoch = epoch,
+origin = AppendOrigin.Coordinator
+  )
+)
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+appendInfo.firstOffset match {
+  case Some(firstOffset) =>
+if (firstOffset.relativePositionInSegment == 0) {
+  // Assume that a new segment was created if the relative position is 0
+  log.deleteOldSegments()
+}
+new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+  case None =>
+throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+}
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+  // Return the epoch of the snapshot when the log is empty
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+s"and the log start offset ($startOffset)"
+  )
+}
+  }.orElse(0)
+}
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
 val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-  new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+  if (oldestSnapshotId.isPresent() &&
+offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
   First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that the sending a snapshot is not 
required. When thinking 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565823251



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   Okay I buy it I'll delay the exception





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9982: MINOR: remove some explicit type argument in generator

2021-01-27 Thread GitBox


dengziming commented on pull request #9982:
URL: https://github.com/apache/kafka/pull/9982#issuecomment-768802003


   @chia7712 @cmccabe  Hello, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565820783



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }

Review comment:
   yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565815347



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {
+log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
 return true;
 }
-if (state == State.PENDING_ERROR) {
-log.info("Streams client is in PENDING_ERROR, all resources are 
being closed and the client will be stopped.");
-if (waitOnState(State.ERROR, timeoutMs)) {
+if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) {
+log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
+if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
 log.info("Streams client stopped to ERROR completely");
 return true;
+} else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+log.info("Streams client stopped to NOT_RUNNING completely");
+return true;
 } else {
-log.info("Streams client cannot transition to ERROR completely 
within the timeout");
+log.warn("Streams client cannot transition to {}} completely 
within the timeout", state);

Review comment:
   the state here doesn't make the log make sense. If the state is 
`PENDING_ERROR` then the log should say ERROR

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   since it doesn't seem that we need to be very thrifty with space for 
this file would it make sense to write it in a more friendly format that would 
be easier to maintain? i.e. json or something, we are giving it a version 
number...

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {

Review comment:
   I think this change makes a lot of sense. I don't think it changes the 
final behavior besides avoiding extra state change rejections from the logs, 
but it looks like they are replaced.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);

Review comment:
   Is there any case where we might want to release the lock of this state 
directory? It looks like we just hold it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-27 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+   

[GitHub] [kafka] vvcephei commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2021-01-27 Thread GitBox


vvcephei commented on pull request #9420:
URL: https://github.com/apache/kafka/pull/9420#issuecomment-768789634


   Hey @dongjinleekr ,
   
   Sorry for the force-push, but I had to rebase this and resolve a conflict 
before merging. Note that the conflict was from 
462c89e0b436abd56864bea8bbcaf1ab70b7f66e, which re-organized the boolean 
conditions in the StateDirectory constructor, specifically where we warn if the 
state dir is a temp dir.
   
   After resolving the conflict, I noticed there's no test for that warning, so 
I added one to be sure it works. It also looked like the temp dir check could 
actually be a bit simpler, so I just tweaked it rather than leaving a new 
comment for you to address.
   
   I hope this is all ok. I'll let the tests run and merge in the morning, 
unless you have any objections.
   
   Thanks!
   -John



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2021-01-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-3745:
--

Assignee: Bill Bejeck

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: api, kip
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.
> KIP-149: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#issuecomment-768786220


   Not done with the tests, but I'd appreciate some feedback on the non-testing 
code and general idea -- any takers for review? @cadonna @vvcephei 
@guozhangwang @wcarlson5 @lct45 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565806577



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##
@@ -112,6 +118,19 @@ public void createTopics() throws Exception {
 CLUSTER.createTopic(outputTopic, 1, 3);
 }
 
+@After
+public void cleanUp() {
+if (streamInstanceOne != null) {
+streamInstanceOne.close();
+}
+if (streamInstanceTwo != null) {
+streamInstanceTwo.close();
+}
+if (streamInstanceOneRecovery != null) {
+streamInstanceOneRecovery.close();
+}

Review comment:
   There are no logical changes to this test, I just had to refactor it a 
bit because we were creating two copies of the same KafkaStreams at the same 
time (with the same app.dir & state.dir), even though one of them wasn't 
started until much later. Since we do the state initialization inside the 
KafkaStreams constructor, this was no good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565805758



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -416,11 +524,15 @@ private void cleanRemovedTasksCalledByUser() throws 
Exception {
 logPrefix(), dirName, id),
 exception
 );
-throw exception;

Review comment:
   IDE was giving me a warning





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9840: KAFKA-10867: Improved task idling

2021-01-27 Thread GitBox


vvcephei merged pull request #9840:
URL: https://github.com/apache/kafka/pull/9840


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling

2021-01-27 Thread GitBox


vvcephei commented on pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#issuecomment-768782409


   Hmm, the Java 8 build appears to have hung after an hour and 58 minutes. 
It's been running for 3 hours and 30 minutes now. This is now the 16th build, 
and there have been multiple Java 8 successes to date, so I think it's 
environmental.
   
   I'll go ahead with the merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901
 ] 

feyman edited comment on KAFKA-9689 at 1/28/21, 3:54 AM:
-

The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). Current implementation 
leverages the assignment.

2.1 Each streamThread put the feature metadata(EOS feature version) in the 
SubscriptionInfo when subscribe

2.2 Upon receiving the JoinGroupResp, the leader will know the current feature 
version in the broker side, it can put the current broker side feature 
version(if updated) in the assignment as suggested feature version

2.3 when the follower receive the assignment in the SyncGroupResp, it will find 
the new broker side latest feature version

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

 

I'm implementing the code as the sequence above, currently on 2, but need to 
discuss if step 2 make sense, haven't start step 3 yet.

 

Questions to [~bchen225242] :

A) 2.1 Might need to add a new field in the SubscriptionInfoData to include the 
client side feature metadata, it seems ok to me since SubscriptionInfoData is 
the stream-specific and doesn't seem to need a KIP for it, thoughts ?

 


was (Author: feyman):
The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). Current implementation 
levrages the assignment.

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565803134



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -54,14 +61,27 @@
 private static final Logger log = 
LoggerFactory.getLogger(StateDirectory.class);
 static final String LOCK_FILE_NAME = ".lock";
 
+/* The process file is used to persist the process id across restarts.
+ * The version 0 schema consists only of the version number and UUID
+ *
+ * If you need to store additional metadata of the process you can bump 
the version numberand append new fields.
+ * For compatibility reasons you should only ever add fields, and only by 
appending them to the end
+ */
+private static final String PROCESS_FILE_NAME = 
"kafka-streams-process-metadata";
+private static final int PROCESS_FILE_VERSION = 0;

Review comment:
   No idea if we'll ever want to add anything else to this file, but better 
to be safe and forward compatible than sad





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565802847



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   But it isn't consistent because if the thread removes itself then the 
timeout its started





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802766



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {

Review comment:
   Something I noticed during testing, I feel it makes sense for the 
handling of ERROR and NOT_RUNNING to parallel (same for the PENDING_ flavors). 
This is a slight change in behavior; now if a user calls `close()` while the 
instance is already closing, it will wait for the ongoing shutdown to complete 
before returning (with timeout).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 return streamThread;
 }
 
+private static Metrics getMetrics(final StreamsConfig config, final Time 
time, final String clientId) {
+final MetricConfig metricConfig = new MetricConfig()
+.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
+final List reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+  
MetricsReporter.class,
+  
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
+final JmxReporter jmxReporter = new JmxReporter();
+jmxReporter.configure(config.originals());
+reporters.add(jmxReporter);
+final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
+  
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+return new Metrics(metricConfig, reporters, time, metricsContext);
+}
+
+private int getNumStreamThreads(final boolean hasGlobalTopology) {
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+return numStreamThreads;

Review comment:
   Just tried to factor some of the self-contained logic into helper 
methods, since I found it incredibly difficult to get oriented within the 
super-long KafkaStreams constructor





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565801932



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
  final Time time) throws StreamsException {
 this.config = config;
 this.time = time;
+
+this.internalTopologyBuilder = internalTopologyBuilder;
+internalTopologyBuilder.rewriteTopology(config);
+
+// sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+taskTopology = internalTopologyBuilder.buildTopology();
+globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+final boolean hasGlobalTopology = globalTaskTopology != null;
+final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+(hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+processId = stateDirectory.initializeProcessId();

Review comment:
   this is the only logical change in the KafkaStreams constructor: the 
rest of the diff is due to moving things around in order to get everything 
initialized in the proper order





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565797234



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   It does seem like kind of a gray area. Still, the TimeoutException isn't 
necessarily saying that it failed, just that we didn't wait long enough for it 
to finish the shutdown. But we have at least definitely initiated the shutdown 
-- besides, if the thread really is stuck in its shutdown then it's probably a 
benefit to go ahead with the `removeMembersFromConsumerGroup` call to get it 
kicked out all the sooner.
   
   But, in the end, we really make no guarantees about the application should a 
user choose to ignore the  TimeoutException (though they absolutely can). I can 
imagine that some users might choose to just swallow it and decide that they 
don't care if the shutdown is taking a long time. It's hard to say





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-27 Thread GitBox


chia7712 commented on pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#issuecomment-768769711


   For another, ```testWriteControlBatchNotAllowedMagicV0``` and 
```testWriteControlBatchNotAllowedMagicV1``` are almost same. Could we merge 
them into single test case?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] inponomarev commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


inponomarev commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-768769254


   > @inponomarev the failing tests seems to be due to a known issue that was 
fixed via #9768
   > 
   > Can you rebase your PR to pickup the fix so we can get a green build?
   
   Done rebasing, expect the fixes according to your latest review soon!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r565786393



##
File path: core/src/main/scala/kafka/server/Server.scala
##
@@ -46,6 +46,22 @@ object Server {
 new Metrics(metricConfig, reporters, time, true, metricsContext)
   }
 
+  def initializeMetrics(
+config: KafkaConfig,
+time: Time,
+metaProps: MetaProperties

Review comment:
   It seems to me the properties in ```MetaProperties``` is duplicate to 
```KafkaConfig``` in this case. Is there any reason that we need to pass 
```MetaProperties```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565785613



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   I don't either...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   H. That is interesting. I am not sure. If the thread hasn't been 
removed then we don't want to resize the cache. The timeout is essentially 
saying that removing the thread failed. So is it right to then remove it 
anyways? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   H. That is interesting. I am not sure. If the thread hasn't been 
removed then we don't want to resize the cache so would removing the thread 
then throwing an exception the right way of doing it as the timeout is 
essentially  saying that removing the thread failed. So is it right to then 
remove it anyways?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565783921



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1843,6 +1843,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setNumPartitions(-1)
   .setReplicationFactor(-1)
   .setTopicConfigErrorCode(Errors.NONE.code)
+  } else {
+
result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(),
 Uuid.ZERO_UUID))

Review comment:
   I've added something like this to ZkAdminManager. Let me know if it 
makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


hachikuji commented on pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#issuecomment-768756102


   @twobeeb Before I merge, would you mind updating the PR description? Also, I 
will leave it to you to add the doc suggestion from @skaundinya15. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.

2021-01-27 Thread GitBox


gardnervickers commented on a change in pull request #9980:
URL: https://github.com/apache/kafka/pull/9980#discussion_r565781027



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -63,7 +63,7 @@ private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
 
   def empty(producerId: Long) = new ProducerStateEntry(producerId,
-batchMetadata = mutable.Queue[BatchMetadata](),
+batchMetadata = new mutable.Queue[BatchMetadata](5),

Review comment:
   Yes, good suggestion :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


skaundinya15 commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565772802



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   ```suggestion
   // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
   // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
   // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
   // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A for monitoring replication health)
   // A->B for the MirrorSourceConnector (actual 
replication flow)
   if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```
   Looks good to me, just had a small tweak for the `B->A` comment. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-27 Thread GitBox


hachikuji commented on pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#issuecomment-768742967


   @chia7712 @ijuma Thanks for the comments thus far. This is ready for another 
look when you have time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2021-01-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-3745:
--

Assignee: (was: Bill Bejeck)

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, kip
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.
> KIP-149: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2021-01-27 Thread GitBox


mjsax commented on pull request #9420:
URL: https://github.com/apache/kafka/pull/9420#issuecomment-768709672


   @dongjinleekr -- the PR shows merge conflicts. Can you rebase once more. 
Sorry about that.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2021-01-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273249#comment-17273249
 ] 

Matthias J. Sax commented on KAFKA-6520:


i am wondering if https://issues.apache.org/jira/browse/KAFKA-10866 (merge 
recently) is something we could exploit to implement a DISCONNECT state? The 
new metadata contains a `receivedTimestamp` field and thus we could track the 
time difference of "now" and the last received fetch response.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12220) Replace PowerMock by Mockito

2021-01-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273244#comment-17273244
 ] 

Chia-Ping Tsai commented on KAFKA-12220:


[~ijuma] How about splitting PR by package? 

||package||classes||
|org.apache.kafka.connect.runtime.standalone|1|
|org.apache.kafka.connect.runtime.distributed|3|
|org.apache.kafka.connect.runtime.errors|2|
|org.apache.kafka.connect.runtime.rest|3|
|org.apache.kafka.connect.util|3|
|org.apache.kafka.connect.storage|4|
|org.apache.kafka.connect.runtime|9|


> Replace PowerMock by Mockito
> 
>
> Key: KAFKA-12220
> URL: https://issues.apache.org/jira/browse/KAFKA-12220
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We are migrating project from junit 4 to junit 5 (KAFKA-7339). PowerMock, 
> however, does not support junit 5 totally 
> (https://github.com/powermock/powermock/issues/830). Hence, we ought to 
> replace PowerMock by Mockito before migrating to junit 5 since rewriting all 
> tests which are depending on PowerMock can bring a bunch of changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

2021-01-27 Thread GitBox


mjsax commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-768685944


   Thanks @lct45!
   
   For reference: https://github.com/apache/kafka/pull/9951 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 merged pull request #9981:
URL: https://github.com/apache/kafka/pull/9981


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 commented on pull request #9981:
URL: https://github.com/apache/kafka/pull/9981#issuecomment-768681359


   build pass



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565746851



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,8 +783,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
  final Time time) throws StreamsException {
 this.config = config;
 this.time = time;
+
+this.internalTopologyBuilder = internalTopologyBuilder;
+internalTopologyBuilder.rewriteTopology(config);
+
+// sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+taskTopology = internalTopologyBuilder.buildTopology();
+globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+final boolean hasGlobalTopology = globalTaskTopology != null;
+final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+(hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+processId = stateDirectory.getProcessId();

Review comment:
   This is the only real change in the constructor, but I had to move a few 
things around and tried to organize them as I went





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565741567



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }

Review comment:
   One last thing, can you add a version of the 
`shouldRemoveStreamThread()` test that uses static membership? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565740953



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (groupInstanceID.isPresent()) {

Review comment:
   Yeah I think that makes sense here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565731661



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -319,9 +319,9 @@ private void prepareStreamThread(final StreamThread thread, 
final boolean termin
 StreamThread.State.PARTITIONS_ASSIGNED);
 return null;
 }).anyTimes();
+
EasyMock.expect(thread.getGroupInstanceID()).andReturn(Optional.empty()).anyTimes();

Review comment:
   ```suggestion
   
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   It's a bit weird to test this by passing in a negative timeout but I 
don't have any good ideas for forcing it to exceed the timeout  

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {

Review comment:
   ```suggestion
   public void shouldNotRemoveStreamThreadWithinTimeout() throws Exception {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   Hm actually now that I think about it, we should probably continue with 
the cleanup to leave the 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565734101



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+} catch (final 
java.util.concurrent.TimeoutException e) {

Review comment:
   We should. And I think maybe we should log the original stack trace





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565732778



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (groupInstanceID.isPresent()) {

Review comment:
   Ok, so we just do something like if (groupInstanceID.isPresent() && 
!streamThread.getName().equals(Thread.currentThread().getName())  when deciding 
whether to remove it from the group?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei merged pull request #9836:
URL: https://github.com/apache/kafka/pull/9836


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-768664736


   Flaky test failures:
   
   ```
   Build / JDK 11 / 
org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate()
   Build / JDK 11 / 
org.apache.kafka.clients.producer.KafkaProducerTest.testHeadersWithExtendedClasses()
   Build / JDK 15 / 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete()
   ```
   
   The most concerning one is the FetcherTest, but it's also failing on trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.

2021-01-27 Thread GitBox


satishd commented on a change in pull request #9980:
URL: https://github.com/apache/kafka/pull/9980#discussion_r565730154



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -63,7 +63,7 @@ private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
 
   def empty(producerId: Long) = new ProducerStateEntry(producerId,
-batchMetadata = mutable.Queue[BatchMetadata](),
+batchMetadata = new mutable.Queue[BatchMetadata](5),

Review comment:
   minor: you may want to have it as `new 
mutable.Queue[BatchMetadata](NumBatchesToRetain)`  instead of harcoding 
directly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565720574



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {

Review comment:
   We generally don't explicitly make this part of the API, and just inform 
users through the javadocs as you've done
   ```suggestion
   public Optional removeStreamThread(final Duration timeout) {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time

Review comment:
   ```suggestion
* @throws org.apache.kafka.common.errors.TimeoutException if the thread 
does not stop in time
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -88,9 +91,11 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.errors.TimeoutException;

Review comment:
   nit: move the import to the other `o.a.k.*` imports

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 

[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-01-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273206#comment-17273206
 ] 

Matthias J. Sax commented on KAFKA-10847:
-

Thanks [~spena] – overall using a second store might be the simples solution 
and if we can get some perf results we can make a better decision if the 
performance is acceptable or not.

The only thing I tend to object is the usage of _wall-clock_ time punctuation, 
because it would introduce non-determinism. And if we use stream-time 
punctuations, we could even avoid punctuations at all, an "piggy-back" emitting 
left/outer join result, each time we process an input record.

Also take into account the grace period, ie, we should only emit left/outer 
join result after a window closes (not when a window end): window close = 
window end + grace period.

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565722046



##
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+val destinationId = 2
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))

Review comment:
   Can we move this to `RaftConfigTest`? It's not really part of the 
behavior of `KafkaNetworkChannel`.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,7 +87,48 @@
 private final int electionBackoffMaxMs;
 private final int fetchTimeoutMs;
 private final int appendLingerMs;
-private final Map voterConnections;
+private final Map voterConnections;
+
+public static abstract class AddressSpec {
+   public abstract InetSocketAddress address();

Review comment:
   Do we need this in the abstract class? I was thinking we would only be 
able to access `InetSocketAddress` if the type is `InetAddressSpec`. Otherwise 
the type protection from `AddressSpec` loses its bite.

##
File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
##
@@ -139,6 +145,25 @@ class KafkaNetworkChannelTest {
 }
   }
 
+  @Test
+  def testNonRoutableAddressUpdateRequest(): Unit = {
+val destinationId = 2
+assertThrows(classOf[IllegalArgumentException],
+  () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0)))
+
+// Update channel with a valid endpoint

Review comment:
   Not sure there's much value in the rest of this test. Seems effectively 
the same as `testSendAndReceiveOutboundRequest`.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -76,7 +87,48 @@
 private final int electionBackoffMaxMs;
 private final int fetchTimeoutMs;
 private final int appendLingerMs;
-private final Map voterConnections;
+private final Map voterConnections;
+
+public static abstract class AddressSpec {
+   public abstract InetSocketAddress address();
+
+@Override
+public boolean equals(Object obj) {
+if (this == obj) {
+return true;
+}
+
+if (obj == null || getClass() != obj.getClass()) {
+return false;
+}
+
+final AddressSpec that = (AddressSpec) obj;
+return that.address().equals(address());
+}
+}
+
+public static class InetAddressSpec extends AddressSpec {
+private final InetSocketAddress address;
+
+public InetAddressSpec(InetSocketAddress address) {
+if (address.equals(UNROUTABLE_ADDRESS)) {
+throw new IllegalArgumentException("Address not routable");
+}
+this.address = address;
+}
+
+@Override
+public InetSocketAddress address() {
+return address;
+}
+}
+
+public static class UnknownAddressSpec extends AddressSpec {

Review comment:
   A common pattern for classes like this without any state is to create a 
static instance.
   ```java
 public static final UnknownAddressSpec INSTANCE = new UnknownAddressSpec();
   ```

##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -118,9 +119,20 @@ class KafkaRaftManager[T](
   private val raftIoThread = new RaftIoThread(raftClient)
 
   def startup(): Unit = {
+// Update the voter endpoints (if valid) with what's in RaftConfig
+val voterAddresses: util.Map[Integer, AddressSpec] = 
raftConfig.quorumVoterConnections
+for (voterAddressEntry <- voterAddresses.entrySet.asScala) {
+  voterAddressEntry.getValue match {
+case spec: InetAddressSpec => {
+  netChannel.updateEndpoint(voterAddressEntry.getKey, spec)
+}
+case invalid: AddressSpec => {
+  logger.warn(s"Skipping channel update for destination ID: 
${voterAddressEntry.getKey} " +

Review comment:
   This could be `info` I think in the case of `UnknownAddressSpec`. It is 
expected behavior to skip the update. We could add a third case for unexpected 
`AddressSpec` types.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   Thanks for your review @skaundinya15.
   I'm having a hard time phrasing this properly, suggestions would be welcome.
   Is this comment proposition aligned with what you had in mind ?
   ```suggestion
   // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
   // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
   // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
   // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A)
   // A->B for the MirrorSourceConnector (actual 
replication flow)
   if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   @skaundinya15 Thanks for your review.
   I'm having a hard time phrasing this properly, suggestions would be welcome.
   Is this comment proposition aligned with what you had in mind ?
   ```suggestion
   // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
   // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
   // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
   // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A)
   // A->B for the MirrorSourceConnector (actual 
replication flow)
   if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on a change in pull request #9987: KAFKA-10895: Gracefully handle invalid JAAS configs

2021-01-27 Thread GitBox


gharris1727 commented on a change in pull request #9987:
URL: https://github.com/apache/kafka/pull/9987#discussion_r565694653



##
File path: 
connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java
##
@@ -80,7 +95,8 @@ public void close() throws IOException {
 
 @Override
 public void configure(Map configs) {
-
+// If we failed to retrieve a JAAS configuration during startup, throw 
that exception now
+CONFIGURATION.get();

Review comment:
   Could you add a test which confirms that we're propagating the exception 
here? At the moment, the test verifies that the wrapping method works, but 
doesn't verify that it's used by the rest extension during the loading phase.

##
File path: 
connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java
##
@@ -63,4 +69,25 @@ public void testJaasConfigurationNotOverwritten() {
 assertNotEquals(overwrittenConfiguration, 
jaasFilter.getValue().configuration,
 "Overwritten JAAS configuration should not be used by basic auth 
REST extension");
 }
+
+@Test
+public void testBadJaasConfiguration() {
+SecurityException jaasConfigurationException = new 
SecurityException(new IOException("Bad JAAS config is bad"));
+Supplier configuration = 
BasicAuthSecurityRestExtension.initializeConfiguration(() -> {
+throw jaasConfigurationException;
+});
+
+ConnectException thrownException = 
assertThrows(ConnectException.class, configuration::get);
+assertEquals(jaasConfigurationException, thrownException.getCause());
+}
+
+@Test
+public void testGoodJaasConfiguration() {
+Configuration mockConfiguration = EasyMock.mock(Configuration.class);

Review comment:
   The identity function could pass this test, but wouldn't have the 
behavior we need in the BasicAuthSecurityRestExtension. I wonder if there's a 
way to confirm that the mockConfiguration has been evaluated prior to calling 
`get()` on the returned supplier.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-27 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte

Review comment:
   typo thte

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by 

[GitHub] [kafka] C0urante opened a new pull request #9987: KAFKA-10895: Gracefully handle invalid JAAS configs

2021-01-27 Thread GitBox


C0urante opened a new pull request #9987:
URL: https://github.com/apache/kafka/pull/9987


   Follow-up to https://github.com/apache/kafka/pull/9806
   
   If an invalid JAAS config is present on the worker, invoking 
`Configuration::getConfiguration` throws an exception. The changes from #9806 
cause that exception to be thrown during plugin scanning, which causes the 
worker to fail even if it is not configured to use the basic auth extension at 
all.
   
   This follow-up handles invalid JAAS configurations more gracefully, and only 
throws them if the worker is actually configured to use the basic auth 
extension, at the time that the extension is instantiated and configured.
   
   Two unit tests are added to test the green-path and red-path behavior of the 
extension when it encounters well-formed and ill-formed JAAS configurations, 
respectively.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673883



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -34,6 +34,7 @@ import scala.collection.mutable
 
 object KafkaNetworkChannel {
 
+  val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0)

Review comment:
   Yea, good catch. The AddressSpec makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673491



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -36,7 +36,9 @@
 public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
 public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint 
information for " +
 "the set of voters in a comma-separated list of `{id}@{host}:{port}` 
entries. " +
-"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" +
+"If the voter endpoints are not known at startup, a non-routable 
address can be provided instead." +

Review comment:
   Fair enough. I can move it there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673305



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -208,8 +209,9 @@ public KafkaRaftClient(
 int fetchMaxWaitMs,
 OptionalInt nodeId,
 LogContext logContext,
-Random random
-) {
+Random random,
+RaftConfig raftConfig
+) throws IOException {

Review comment:
   Yea. This is an artifact from the `quorumState.initialize` change. Since 
that's moved down to the `client.initialize`, we can remove it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565672756



##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   Ack. I considered it. Figured it might be useful to have the endpoints 
for any future tests. Looks like the MockNetworkChannel doesn't test anything 
endpoint specific. Will remove





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #9986: JUnit extensions for integration tests

2021-01-27 Thread GitBox


mumrah opened a new pull request #9986:
URL: https://github.com/apache/kafka/pull/9986


   TBD
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling

2021-01-27 Thread GitBox


vvcephei commented on pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#issuecomment-768599882


   There was a merge conflict with trunk. Rebased and pushed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


skaundinya15 commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565656131



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   Thanks for the explanation @twobeeb, this makes sense. It would be good 
to add some comments explaining this in the code as this isn't immediately 
obvious. Other than that it looks good to me overall.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol

2021-01-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273158#comment-17273158
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12169:


hey [~boyang] any thoughts?

> Consumer can not know paritions chage when client leader restart with static 
> membership protocol
> 
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565643848



##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   On second thought, it seems worth keeping this as a set. It helps us 
ensure that no requests are sent to non-voters. I would just change the field 
name to `Set voterIds`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565634630



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -34,6 +34,7 @@ import scala.collection.mutable
 
 object KafkaNetworkChannel {
 
+  val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0)

Review comment:
   Let me suggest an alternative for the sake of argument. Currently, 
`RaftConfig.parseVoterConnections` return `Map`. 
This works for the case we're interested in, but there is a risk of our 
sentinel non-routable address leaking into unexpected cases (a common source of 
bugs in Kafkaland). Alternatively, what if we add something like this to 
`RaftConfig`:
   
   ```java
   public class RaftConfig {
 ...
   
 public Map quorumVoterConnections();
   
 public static interface AddressSpec {
 }
   
 public static class InetAddressSpec implements AddressSpec {
   final InetSocketAddress address;
 }
   
 public static class UnknownAddressSpec implements AddressSpec {
 }
   }
   ```
   
   The advantage is that this lets the type checker help us ensure that we are 
checking for a sentinel. 

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -208,8 +209,9 @@ public KafkaRaftClient(
 int fetchMaxWaitMs,
 OptionalInt nodeId,
 LogContext logContext,
-Random random
-) {
+Random random,
+RaftConfig raftConfig
+) throws IOException {

Review comment:
   Is anything in here throwing `IOException`?

##
File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
##
@@ -25,20 +25,25 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class MockNetworkChannel implements NetworkChannel {
 private final AtomicInteger correlationIdCounter;
+private final Map addressCache;

Review comment:
   I don't think we are using the address here. Can we use `Set`? 
Potentially we could even get rid of this collection. It was more useful when 
the RaftClient itself was expected to discover the voter endpoints.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -36,7 +36,9 @@
 public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
 public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint 
information for " +
 "the set of voters in a comma-separated list of `{id}@{host}:{port}` 
entries. " +
-"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" +
+"If the voter endpoints are not known at startup, a non-routable 
address can be provided instead." +

Review comment:
   Perhaps we can keep this as an internal feature for now. It is not 
something that a user would be able to leverage. We can document it in the 
class javadoc perhaps. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565640687



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+try {
+
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs 
- begin, TimeUnit.MILLISECONDS);
+} catch (final 
java.util.concurrent.TimeoutException e) {

Review comment:
   have to make this a kafkaTimeout





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565635667



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));

Review comment:
   Sounds good. How should you handle the `ExecutionException`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration

2021-01-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10716:
---
Priority: Critical  (was: Major)

> Streams processId is unstable across restarts resulting in task mass migration
> --
>
> Key: KAFKA-10716
> URL: https://issues.apache.org/jira/browse/KAFKA-10716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> The new high availability feature of KIP-441 relies on deterministic 
> assignment to produce an eventually-stable assignment. The 
> HighAvailabilityTaskAssignor assigns tasks based on the unique processId 
> assigned to each client, so if the same set of Kafka Streams applications 
> participate in a rebalance it should generate the same task assignment every 
> time.
> Unfortunately the processIds aren't stable across restarts. We generate a 
> random UUID in the KafkaStreams constructor, so each time the process starts 
> up it would be assigned a completely different processId. Unless this new 
> processId happens to be in exactly the same order as the previous one, a 
> single bounce or crash/restart can result in a large scale shuffling of tasks 
> based on a completely different eventual assignment.
> Ultimately we should fix this via KAFKA-10121, but that's a nontrivial 
> undertaking and this bug merits some immediate relief if we don't intend to 
> tackle the larger problem in the upcoming releases 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration

2021-01-27 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10716:
---
Fix Version/s: 2.6.2

> Streams processId is unstable across restarts resulting in task mass migration
> --
>
> Key: KAFKA-10716
> URL: https://issues.apache.org/jira/browse/KAFKA-10716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> The new high availability feature of KIP-441 relies on deterministic 
> assignment to produce an eventually-stable assignment. The 
> HighAvailabilityTaskAssignor assigns tasks based on the unique processId 
> assigned to each client, so if the same set of Kafka Streams applications 
> participate in a rebalance it should generate the same task assignment every 
> time.
> Unfortunately the processIds aren't stable across restarts. We generate a 
> random UUID in the KafkaStreams constructor, so each time the process starts 
> up it would be assigned a completely different processId. Unless this new 
> processId happens to be in exactly the same order as the previous one, a 
> single bounce or crash/restart can result in a large scale shuffling of tasks 
> based on a completely different eventual assignment.
> Ultimately we should fix this via KAFKA-10121, but that's a nontrivial 
> undertaking and this bug merits some immediate relief if we don't intend to 
> tackle the larger problem in the upcoming releases 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565628682



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   that works





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil opened a new pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil opened a new pull request #9985:
URL: https://github.com/apache/kafka/pull/9985


   With KIP-595, we expect the RaftConfig to specify the quorum voter endpoints 
upfront on startup. In the general case, this works fine. However, for testing 
we need a more lazy approach that discovers the other voters in the quorum 
after startup (i.e. controller port bind). This approach also lends itself well 
to cases where we might have an observer that discovers the voter endpoints 
from, say a `DescribeQuorum` event.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565628241



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);

Review comment:
   good idea, I reworked that a bit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565622524



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {
+timeoutMs = 0;
+} else if (timeoutMs == 0) {
+timeoutMs = Long.MAX_VALUE;

Review comment:
   ah yeah, I had to fix this when I was writing my test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565622163



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -91,6 +93,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;

Review comment:
   I did not know that. good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565621384



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID() {

Review comment:
   Either way we need to deal with it. I thought it would be easier to just 
do it once. But It probably better practice to handle it later. I will change 
it to Optional





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


bbejeck commented on pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#issuecomment-768565772


   Java 11 and 15 passed
   
   Java 8 failures unrelated
   ```
   org.apache.kafka.clients.consumer.KafkaConsumerTest.testCloseWithTimeUnit()
   
org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate()
   kafka.api.TransactionsBounceTest.testWithGroupMetadata()
   ```
   Kicking off tests again to try and get a green build



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565611900



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -91,6 +93,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;

Review comment:
   There's actually a kafka-specific version of `TimeoutException` that you 
should use to keep in line with other kafka APIs. It's 
`org.apache.kafka.common.errors.TimeoutException`

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+final Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));
+
adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG),
 new RemoveMembersFromConsumerGroupOptions(membersToRemove));
+return Optional.of(streamThread.getName());
+}
+}
+}
+log.warn("There are no threads eligible for removal");
+} else {
+log.warn("Cannot remove a stream thread when Kafka Streams client 
is in state  " + state());
+}
+return Optional.empty();
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+if (isRunningOrRebalancing()) {
+synchronized (changeThreadCount) {
+// make a copy of threads to avoid holding lock
+for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
+if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
+|| threads.size() == 1)) {
+streamThread.shutdown();
+if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
+}
+threads.remove(streamThread);
+final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
+resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   I'm not sure how `removeMembersFromConsumerGroup` would behave if you 
passed in `""` as the `group.instance.id`, do you know? If not then let's just 
be safe and check what `streamThread.getGroupInstanceID()` returns, and skip 
this call if there is no group.instance.id (ie if not static)

##
File path: 

[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


cadonna commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565616503



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID(){
+return mainConsumer.groupMetadata().groupInstanceId().orElse("");

Review comment:
   I would do it the same way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


cadonna commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565615976



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID() {

Review comment:
   Why not an `Optional`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-27 Thread GitBox


rajinisivaram commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r565615638



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 Set.empty[MetadataResponseTopic]
   else
 unauthorizedForDescribeTopics.map(topic =>
-  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))

Review comment:
   Good point. If not authorized for describe when using topic ids, we need 
to make sure we don't return the topic or information about existence of a 
topic - i.e we can't return TOPIC_AUTHORIZATION_FAILED. Perhaps 
UNKNOWN_TOPIC_ID would be more suitable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9944:
URL: https://github.com/apache/kafka/pull/9944#discussion_r565612146



##
File path: 
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java
##
@@ -186,23 +241,37 @@ public String toString() {
  * incremental fetch requests (see below).
  */
 private LinkedHashMap next;
+private Map topicIds;
+private Map topicNames;
+private Map partitionsPerTopic;
 private final boolean copySessionPartitions;
 
 Builder() {
 this.next = new LinkedHashMap<>();
+this.topicIds = new HashMap<>();
+this.topicNames = new HashMap<>();
+this.partitionsPerTopic = new HashMap<>();
 this.copySessionPartitions = true;
 }
 
 Builder(int initialSize, boolean copySessionPartitions) {
 this.next = new LinkedHashMap<>(initialSize);
+this.topicIds = new HashMap<>(initialSize);
+this.topicNames = new HashMap<>(initialSize);
+this.partitionsPerTopic = new HashMap<>(initialSize);
 this.copySessionPartitions = copySessionPartitions;
 }
 
 /**
  * Mark that we want data from this partition in the upcoming fetch.
  */
-public void add(TopicPartition topicPartition, PartitionData data) {
-next.put(topicPartition, data);
+public void add(TopicPartition topicPartition, Uuid id, PartitionData 
data) {
+if (next.put(topicPartition, data) == null)
+partitionsPerTopic.merge(topicPartition.topic(), 1, (prev, 
next) -> prev + next);

Review comment:
   I think I may want to do this in a simpler way. I want to keep track if 
we have IDs for all the topics and I'm not sure if there is a better way to 
figure out when a topic is no longer in a session besides checking all the 
topic partitions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565607533



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {

Review comment:
   for the non timeout uses

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID(){
+return mainConsumer.groupMetadata().groupInstanceId().orElse("");

Review comment:
   It seems easier to get it form here than the config. It looked like I 
might have how to manipulate strings in that case 

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1007,56 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   I ended up getting the `group.instance.id` from the streamThread





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 opened a new pull request #9984:
URL: https://github.com/apache/kafka/pull/9984


   add timeout and static group rebalance to remove thread
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-768546193


   Most of those failures were known flaky tests, but one was an EasyMock 
error. I'm not able to repro it locally after a rebase, though. Rebased, 
pushed, and trying one more time to get a clean build.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
 results.add(new DeletableTopicResult()
-  .setName(topic))
+  .setName(name)
+  .setTopicId(topic.topicId()))
   }
   val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
 results.asScala)(_.name)
   results.forEach { topic =>
- if (!authorizedTopics.contains(topic.name))
+ val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+ val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
   Thinking on this more, I can simplify the line. Especially if I make 
changes with the code above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
 results.add(new DeletableTopicResult()
-  .setName(topic))
+  .setName(name)
+  .setTopicId(topic.topicId()))
   }
   val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
 results.asScala)(_.name)
   results.forEach { topic =>
- if (!authorizedTopics.contains(topic.name))
+ val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+ val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
   Thinking on this more, I can simplify the line. If topic name is null, 
then we didn't have a valid topic ID.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >