[GitHub] [kafka] xiao-penglei commented on pull request #9872: KAFKA-10759: ARM support for Kafka
xiao-penglei commented on pull request #9872: URL: https://github.com/apache/kafka/pull/9872#issuecomment-768859211 > @xiao-penglei : Thanks for the PR. Did the "Arm Build" get triggered in jenkins? I can't seem to find it. @junrao When you create a Jenkins pipline, there are two configuration methods of Jenkinsfile source. One is to use the Jenkinsfile of repository, and the other way is to use the static Jenkinsfile. I do not know the configuration of the jenkis pipline of Kafka because I do not have permission. The "Arm build" can be found only after the Jenkinsfile used in the jenkins pipline is modified. The Jenkinsfile modification that I submitted was tested locally. Now we need modify the Jenkinsfile used in the jenkins pipline. Maybe @ijuma can help us. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9689) Automatic broker version detection to initialize stream client
[ https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273379#comment-17273379 ] Boyang Chen commented on KAFKA-9689: I agree with A) since it is internal struct. > Automatic broker version detection to initialize stream client > -- > > Key: KAFKA-9689 > URL: https://issues.apache.org/jira/browse/KAFKA-9689 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > Eventually we shall deprecate the flag to suppress EOS thread producer > feature, instead we take version detection approach on broker to decide which > semantic to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xiao-penglei commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka
xiao-penglei commented on a change in pull request #9872: URL: https://github.com/apache/kafka/pull/9872#discussion_r565854978 ## File path: Jenkinsfile ## @@ -160,5 +160,23 @@ pipeline { } } } +stage("Arm Build") { + agent { label 'arm4' } + options { +timeout(time: 8, unit: 'HOURS') +timestamps() + } + environment { Review comment: @junrao Yeah, I have installed the openjdk-8u252-b09 that the version is same as jdk_1.8_latest of apache infra. The reason is that I could not find appropriate version of jdk that can run on arm platform from the jenkins software of apache infra. [infra jdk matrix](https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix) . I have also installed the maven-3.6.3 that version is same as maven_3_latest of apache infra. I will take time to ensure the versions of jdk and maven are consistent with those of apache infra. In most cases, the arm4 node is just for kafka arm ci. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xiao-penglei commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka
xiao-penglei commented on a change in pull request #9872: URL: https://github.com/apache/kafka/pull/9872#discussion_r565854978 ## File path: Jenkinsfile ## @@ -160,5 +160,23 @@ pipeline { } } } +stage("Arm Build") { + agent { label 'arm4' } + options { +timeout(time: 8, unit: 'HOURS') +timestamps() + } + environment { Review comment: Yeah, I have installed the openjdk-8u252-b09 that the version is same as jdk_1.8_latest of apache infra. The reason is that I could not find appropriate version of jdk that can run on arm platform from the jenkins software of apache infra. [infra jdk matrix](https://cwiki.apache.org/confluence/display/INFRA/JDK+Installation+Matrix) . I have also installed the maven-3.6.3 that version is same as maven_3_latest of apache infra. I will take time to ensure the versions of jdk and maven are consistent with those of apache infra. In most cases, the arm4 node is just for kafka arm ci. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9791: KAFKA-10873: ignore warning messages if connector/task start failed
showuon commented on pull request #9791: URL: https://github.com/apache/kafka/pull/9791#issuecomment-768830962 @kkonstantine , Thanks for the comments. So you mean this issue can be kept as is? Or we should find another way to implement it instead of bookkeeping way? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10658) ErrantRecordReporter.report always return completed future even though the record is not sent to DLQ topic yet
[ https://issues.apache.org/jira/browse/KAFKA-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10658. Fix Version/s: 2.6.2 2.7.1 2.8.0 Resolution: Fixed > ErrantRecordReporter.report always return completed future even though the > record is not sent to DLQ topic yet > --- > > Key: KAFKA-10658 > URL: https://issues.apache.org/jira/browse/KAFKA-10658 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > This issue happens when both DLQ and error log are enabled. There is a > incorrect filter in handling multiple reports and it results in the > uncompleted future is filtered out. Hence, users always receive a completed > future even though the record is still in producer buffer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9525: KAFKA-10658 ErrantRecordReporter.report always return completed futur…
chia7712 commented on pull request #9525: URL: https://github.com/apache/kafka/pull/9525#issuecomment-768828561 merge to trunk, 2.7 and 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9525: KAFKA-10658 ErrantRecordReporter.report always return completed futur…
chia7712 merged pull request #9525: URL: https://github.com/apache/kafka/pull/9525 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] g1geordie commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
g1geordie commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r565834861 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -1004,10 +998,6 @@ public void testWithRecords(Args args) { } Review comment: It sound like you change `assume (condition)` to `if (condition) ... else ...` in all method This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10658) ErrantRecordReporter.report always return completed future even though the record is not sent to DLQ topic yet
[ https://issues.apache.org/jira/browse/KAFKA-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10658: --- Affects Version/s: 2.6.0 > ErrantRecordReporter.report always return completed future even though the > record is not sent to DLQ topic yet > --- > > Key: KAFKA-10658 > URL: https://issues.apache.org/jira/browse/KAFKA-10658 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > This issue happens when both DLQ and error log are enabled. There is a > incorrect filter in handling multiple reports and it results in the > uncompleted future is filtered out. Hence, users always receive a completed > future even though the record is still in producer buffer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273328#comment-17273328 ] Boyang Chen commented on KAFKA-12169: - In general, the leader should be able to detect metadata discrepancy between its remembered topic metadata and broker side metadata. I don't think we have any test case to cover both the topic partition change and leader rejoin at the same time, so it's possible and needs some verification. > Consumer can not know paritions chage when client leader restart with static > membership protocol > > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) +handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], +leaderEpoch = epoch, +origin = AppendOrigin.Coordinator + ) +) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { +appendInfo.firstOffset match { + case Some(firstOffset) => +if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() +} +new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => +throw new KafkaException(s"Append failed unexpectedly: $appendInfo") +} } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + +s"and the log start offset ($startOffset)" + ) +} + }.orElse(0) +} } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && +offsetAndEpoch.offset == oldestSnapshotId.get().offset && +offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: First, thanks a lot for thinking through this code and provide such detail comment. This code is important to get right. > the requested epoch is larger than any known epoch. For this case I decided to throw an exception because the Fetch request handling code already checks for this condition and returns an error Fetch response. The leader returns an error Fetch response when this is invariant is violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other words based on the current implementation, I think it is a bug if `endOffsetForEpoch` returns `Optional.empty()`. 1. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954 2. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621 > the requested epoch is less than any known epoch we have When thinking though this case I convinced myself. That the leader can determine if it should send a snapshot simply by comparing "fetch offset" and "last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is the snapshot with an end offset equal to the log start offset. > The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the offset corresponding to the oldest snapshot, which should be the same as the log start offset. Right so far? Correct. My comment here assumes that the fetch offset is between the log start offset and log end offset, and that sending a snapshot is not required. When thinking through
[GitHub] [kafka] chia7712 commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
chia7712 commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r565826783 ## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ## @@ -1004,10 +998,6 @@ public void testWithRecords(Args args) { } Review comment: @g1geordie I file a patch for aforementioned idea. Please take a look at https://github.com/chia7712/kafka/pull/1/files it uses explicit assert (exception or expected value) for all parameters instead of just ignoring them This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) +handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], +leaderEpoch = epoch, +origin = AppendOrigin.Coordinator + ) +) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { +appendInfo.firstOffset match { + case Some(firstOffset) => +if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() +} +new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => +throw new KafkaException(s"Append failed unexpectedly: $appendInfo") +} } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + +s"and the log start offset ($startOffset)" + ) +} + }.orElse(0) +} } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && +offsetAndEpoch.offset == oldestSnapshotId.get().offset && +offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: First, thanks a lot for thinking through this code and provide such detail comment. This code is important to get right. > the requested epoch is larger than any known epoch. For this case I decided to throw an exception because the Fetch request handling code already checks for this condition and returns an error Fetch response. The leader returns an error Fetch response when this is invariant is violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other words based on the current implementation, I think it is a bug if `endOffsetForEpoch` returns `Optional.empty()`. 1. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954 2. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621 > the requested epoch is less than any known epoch we have When thinking though this case I convinced myself. That the leader can determine if it should send a snapshot simply by comparing "fetch offset" and "last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is the snapshot with an end offset equal to the log start offset. > The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the offset corresponding to the oldest snapshot, which should be the same as the log start offset. Right so far? Correct. My comment here assumes that the fetch offset is between the log start offset and log end offset, and that sending a snapshot is not required. When thinking through
[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset
jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -66,32 +78,67 @@ class KafkaMetadataLog( if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords], - leaderEpoch = epoch, - origin = AppendOrigin.Coordinator) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) +handleAndConvertLogAppendInfo( + log.appendAsLeader(records.asInstanceOf[MemoryRecords], +leaderEpoch = epoch, +origin = AppendOrigin.Coordinator + ) +) } override def appendAsFollower(records: Records): LogAppendInfo = { if (records.sizeInBytes == 0) throw new IllegalArgumentException("Attempt to append an empty record set") -val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords]) -new LogAppendInfo(appendInfo.firstOffset.getOrElse { - throw new KafkaException("Append failed unexpectedly") -}, appendInfo.lastOffset) + handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords])) + } + + private def handleAndConvertLogAppendInfo(appendInfo: kafka.log.LogAppendInfo): LogAppendInfo = { +appendInfo.firstOffset match { + case Some(firstOffset) => +if (firstOffset.relativePositionInSegment == 0) { + // Assume that a new segment was created if the relative position is 0 + log.deleteOldSegments() +} +new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset) + case None => +throw new KafkaException(s"Append failed unexpectedly: $appendInfo") +} } override def lastFetchedEpoch: Int = { -log.latestEpoch.getOrElse(0) +log.latestEpoch.getOrElse { + latestSnapshotId.map { snapshotId => +val logEndOffset = endOffset().offset +if (snapshotId.offset == startOffset && snapshotId.offset == logEndOffset) { + // Return the epoch of the snapshot when the log is empty + snapshotId.epoch +} else { + throw new KafkaException( +s"Log doesn't have a last fetch epoch and there is a snapshot ($snapshotId). " + +s"Expected the snapshot's end offset to match the log's end offset ($logEndOffset) " + +s"and the log start offset ($startOffset)" + ) +} + }.orElse(0) +} } override def endOffsetForEpoch(leaderEpoch: Int): Optional[raft.OffsetAndEpoch] = { val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch => - new raft.OffsetAndEpoch(offsetAndEpoch.offset, offsetAndEpoch.leaderEpoch) + if (oldestSnapshotId.isPresent() && +offsetAndEpoch.offset == oldestSnapshotId.get().offset && +offsetAndEpoch.leaderEpoch == leaderEpoch) { Review comment: First, thanks a lot for thinking through this code and provide such detail comment. This code is important to get right. > the requested epoch is larger than any known epoch. For this case I decided to throw an exception because the Fetch request handling code already checks for this condition and returns an error Fetch response. The leader returns an error Fetch response when this is invariant is violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other words based on the current implementation, I think it is a bug if `endOffsetForEpoch` returns `Optional.empty()`. 1. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954 2. https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621 > the requested epoch is less than any known epoch we have When thinking though this case I convinced myself. That the leader can determine if it should send a snapshot simply by comparing "fetch offset" and "last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is the snapshot with an end offset equal to the log start offset. > The current epoch cache implementation handles this by returning the requested epoch with an end offset equal to the log start offset. So we detect the case here by checking that the returned epoch matches the requested epoch and the end offset matches the offset corresponding to the oldest snapshot, which should be the same as the log start offset. Right so far? Correct. My comment here assumes that the fetch offset is between the log start offset and log end offset, and that the sending a snapshot is not required. When thinking
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565823251 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: Okay I buy it I'll delay the exception This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9982: MINOR: remove some explicit type argument in generator
dengziming commented on pull request #9982: URL: https://github.com/apache/kafka/pull/9982#issuecomment-768802003 @chia7712 @cmccabe Hello, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565820783 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } Review comment: yes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
wcarlson5 commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565815347 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { +log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", state); return true; } -if (state == State.PENDING_ERROR) { -log.info("Streams client is in PENDING_ERROR, all resources are being closed and the client will be stopped."); -if (waitOnState(State.ERROR, timeoutMs)) { +if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) { +log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", state); +if (state == State.PENDING_ERROR && waitOnState(State.ERROR, timeoutMs)) { log.info("Streams client stopped to ERROR completely"); return true; +} else if (state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, timeoutMs)) { +log.info("Streams client stopped to NOT_RUNNING completely"); +return true; } else { -log.info("Streams client cannot transition to ERROR completely within the timeout"); +log.warn("Streams client cannot transition to {}} completely within the timeout", state); Review comment: the state here doesn't make the log make sense. If the state is `PENDING_ERROR` then the log should say ERROR ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); +} catch (final IOException e) { +log.error("Unable to lock the state directory due to unexpected exception", e); +throw new ProcessorStateException("Failed to lock the state directory during startup", e); +} + +return stateDirLock != null; +} + +public UUID initializeProcessId() { Review comment: since it doesn't seem that we need to be very thrifty with space for this file would it make sense to write it in a more friendly format that would be easier to maintain? i.e. json or something, we are giving it a version number... ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { Review comment: I think this change makes a lot of sense. I don't think it changes the final behavior besides avoiding extra state change rejections from the logs, but it looks like they are replaced. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -133,6 +152,72 @@ private void configurePermissions(final File file) { } } +/** + * @return true if the state directory was successfully locked + */ +private boolean lockStateDirectory() { +final File lockFile = new File(stateDir, LOCK_FILE_NAME); +try { +stateDirLockChannel = FileChannel.open(lockFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE); +stateDirLock = tryLock(stateDirLockChannel); Review comment: Is there any case where we might want to release the lock of this state directory? It looks like we just hold it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed
abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } -case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, +node: Node, +requestThrottleMs: Int, +errorMessage: Option[String] = None): FindCoordinatorResponse = { +new FindCoordinatorResponse( + new FindCoordinatorResponseData() +.setErrorCode(error.code) +.setErrorMessage(errorMessage.getOrElse(error.message)) +.setNodeId(node.id) +.setHost(node.host) +.setPort(node.port) +.setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { -def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() -.setErrorCode(error.code) -.setErrorMessage(error.message) -.setNodeId(node.id) -.setHost(node.host) -.setPort(node.port) -.setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { +if (hasEnoughAliveBrokers(internalTopicName)) { + if (shouldForwardRequest(request)) { +forwardingManager.sendInterBrokerRequest( + getCreateTopicsRequest(Seq(internalTopicName)), + _ => ()) + } else { +val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) + +val topicConfigs = Map(internalTopicName -> getTopicConfigs(internalTopicName)) +adminManager.createTopics( + config.requestTimeoutMs, + validateOnly = false, + topicConfigs, + Map.empty, + controllerMutationQuota, + _ => ()) + } } -val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) -} else { - val coordinatorEndpoint = topicMetadata.partitions.asScala -.find(_.partitionIndex == partition) -.filter(_.leaderId != MetadataResponse.NO_LEADER_ID) -.flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) -.flatMap(_.getNode(request.context.listenerName)) -.filterNot(_.isEmpty) - - coordinatorEndpoint match { -case Some(endpoint) => - createFindCoordinatorResponse(Errors.NONE, endpoint) -case _ => - createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) + +requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => createFindCoordinatorResponse( + Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs)) + } else { +def createResponse(requestThrottleMs: Int): AbstractResponse = { + val responseBody = if (topicMetadata.head.errorCode != Errors.NONE.code) { +
[GitHub] [kafka] vvcephei commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
vvcephei commented on pull request #9420: URL: https://github.com/apache/kafka/pull/9420#issuecomment-768789634 Hey @dongjinleekr , Sorry for the force-push, but I had to rebase this and resolve a conflict before merging. Note that the conflict was from 462c89e0b436abd56864bea8bbcaf1ab70b7f66e, which re-organized the boolean conditions in the StateDirectory constructor, specifically where we warn if the state dir is a temp dir. After resolving the conflict, I noticed there's no test for that warning, so I added one to be sure it works. It also looked like the temp dir check could actually be a bit simpler, so I just tweaked it rather than leaving a new comment for you to address. I hope this is all ok. I'll let the tests run and merge in the morning, unless you have any objections. Thanks! -John This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-3745: -- Assignee: Bill Bejeck > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Assignee: Bill Bejeck >Priority: Minor > Labels: api, kip > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. > KIP-149: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on pull request #9978: URL: https://github.com/apache/kafka/pull/9978#issuecomment-768786220 Not done with the tests, but I'd appreciate some feedback on the non-testing code and general idea -- any takers for review? @cadonna @vvcephei @guozhangwang @wcarlson5 @lct45 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565806577 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ## @@ -112,6 +118,19 @@ public void createTopics() throws Exception { CLUSTER.createTopic(outputTopic, 1, 3); } +@After +public void cleanUp() { +if (streamInstanceOne != null) { +streamInstanceOne.close(); +} +if (streamInstanceTwo != null) { +streamInstanceTwo.close(); +} +if (streamInstanceOneRecovery != null) { +streamInstanceOneRecovery.close(); +} Review comment: There are no logical changes to this test, I just had to refactor it a bit because we were creating two copies of the same KafkaStreams at the same time (with the same app.dir & state.dir), even though one of them wasn't started until much later. Since we do the state initialization inside the KafkaStreams constructor, this was no good This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565805758 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -416,11 +524,15 @@ private void cleanRemovedTasksCalledByUser() throws Exception { logPrefix(), dirName, id), exception ); -throw exception; Review comment: IDE was giving me a warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9840: KAFKA-10867: Improved task idling
vvcephei merged pull request #9840: URL: https://github.com/apache/kafka/pull/9840 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling
vvcephei commented on pull request #9840: URL: https://github.com/apache/kafka/pull/9840#issuecomment-768782409 Hmm, the Java 8 build appears to have hung after an hour and 58 minutes. It's been running for 3 hours and 30 minutes now. This is now the 16th build, and there have been multiple Java 8 successes to date, so I think it's environmental. I'll go ahead with the merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client
[ https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901 ] feyman edited comment on KAFKA-9689 at 1/28/21, 3:54 AM: - The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). Current implementation leverages the assignment. 2.1 Each streamThread put the feature metadata(EOS feature version) in the SubscriptionInfo when subscribe 2.2 Upon receiving the JoinGroupResp, the leader will know the current feature version in the broker side, it can put the current broker side feature version(if updated) in the assignment as suggested feature version 2.3 when the follower receive the assignment in the SyncGroupResp, it will find the new broker side latest feature version 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that I'm implementing the code as the sequence above, currently on 2, but need to discuss if step 2 make sense, haven't start step 3 yet. Questions to [~bchen225242] : A) 2.1 Might need to add a new field in the SubscriptionInfoData to include the client side feature metadata, it seems ok to me since SubscriptionInfoData is the stream-specific and doesn't seem to need a KIP for it, thoughts ? was (Author: feyman): The version detection flow leveraging the versioning system is as described in the section: Use case: {{group_coordinator}} feature flag in KIP-584. The code change mainly contains 3 parts: 1) StreamThread should know if itself is leader in the consumer group, if yes, it should periodically query the describeFeatures api to see if there are feature metadata updates 2) There should be some place to put the feature metadata in the MemberMetadata, either in the assignment(userData) or add a new field in the MemberMetadata(which involves public interface change). Current implementation levrages the assignment. 3) the StreamThread should dynamically switch to the new thread producer without affecting the existing tasks that > Automatic broker version detection to initialize stream client > -- > > Key: KAFKA-9689 > URL: https://issues.apache.org/jira/browse/KAFKA-9689 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > Eventually we shall deprecate the flag to suppress EOS thread producer > feature, instead we take version detection approach on broker to decide which > semantic to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565803134 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -54,14 +61,27 @@ private static final Logger log = LoggerFactory.getLogger(StateDirectory.class); static final String LOCK_FILE_NAME = ".lock"; +/* The process file is used to persist the process id across restarts. + * The version 0 schema consists only of the version number and UUID + * + * If you need to store additional metadata of the process you can bump the version numberand append new fields. + * For compatibility reasons you should only ever add fields, and only by appending them to the end + */ +private static final String PROCESS_FILE_NAME = "kafka-streams-process-metadata"; +private static final int PROCESS_FILE_VERSION = 0; Review comment: No idea if we'll ever want to add anything else to this file, but better to be safe and forward compatible than sad This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565802847 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { +addStreamStateChangeListener(kafkaStreams); +startStreamsAndWaitForRunning(kafkaStreams); + +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); +stateTransitionHistory.clear(); +assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION))); Review comment: But it isn't consistent because if the thread removes itself then the timeout its started This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802766 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) { } private boolean close(final long timeoutMs) { -if (state == State.ERROR) { -log.info("Streams client is already in the terminal state ERROR, all resources are closed and the client has stopped."); +if (state == State.ERROR || state == State.NOT_RUNNING) { Review comment: Something I noticed during testing, I feel it makes sense for the handling of ERROR and NOT_RUNNING to parallel (same for the PENDING_ flavors). This is a slight change in behavior; now if a user calls `close()` while the instance is already closing, it will wait for the ongoing shutdown to complete before returning (with timeout). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin return streamThread; } +private static Metrics getMetrics(final StreamsConfig config, final Time time, final String clientId) { +final MetricConfig metricConfig = new MetricConfig() +.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG))) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS); +final List reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class, + Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId)); +final JmxReporter jmxReporter = new JmxReporter(); +jmxReporter.configure(config.originals()); +reporters.add(jmxReporter); +final MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, + config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)); +return new Metrics(metricConfig, reporters, time, metricsContext); +} + +private int getNumStreamThreads(final boolean hasGlobalTopology) { +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.info("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Topology with no input topics will create no stream threads and no global thread."); +throw new TopologyException("Topology has no stream threads and no global threads, " + +"must subscribe to at least one source topic or global table."); +} +return numStreamThreads; Review comment: Just tried to factor some of the self-contained logic into helper methods, since I found it incredibly difficult to get oriented within the super-long KafkaStreams constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565801932 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + +this.internalTopologyBuilder = internalTopologyBuilder; +internalTopologyBuilder.rewriteTopology(config); + +// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception +taskTopology = internalTopologyBuilder.buildTopology(); +globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + +final boolean hasGlobalTopology = globalTaskTopology != null; +final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || +(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, hasPersistentStores); +processId = stateDirectory.initializeProcessId(); Review comment: this is the only logical change in the KafkaStreams constructor: the rest of the diff is due to moving things around in order to get everything initialized in the proper order This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565797234 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: It does seem like kind of a gray area. Still, the TimeoutException isn't necessarily saying that it failed, just that we didn't wait long enough for it to finish the shutdown. But we have at least definitely initiated the shutdown -- besides, if the thread really is stuck in its shutdown then it's probably a benefit to go ahead with the `removeMembersFromConsumerGroup` call to get it kicked out all the sooner. But, in the end, we really make no guarantees about the application should a user choose to ignore the TimeoutException (though they absolutely can). I can imagine that some users might choose to just swallow it and decide that they don't care if the shutdown is taking a long time. It's hard to say This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…
chia7712 commented on pull request #9906: URL: https://github.com/apache/kafka/pull/9906#issuecomment-768769711 For another, ```testWriteControlBatchNotAllowedMagicV0``` and ```testWriteControlBatchNotAllowedMagicV1``` are almost same. Could we merge them into single test case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] inponomarev commented on pull request #9107: KAFKA-5488: Add type-safe split() operator
inponomarev commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-768769254 > @inponomarev the failing tests seems to be due to a known issue that was fixed via #9768 > > Can you rebase your PR to pickup the fix so we can get a green build? Done rebasing, expect the fixes according to your latest review soon! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500
chia7712 commented on a change in pull request #9967: URL: https://github.com/apache/kafka/pull/9967#discussion_r565786393 ## File path: core/src/main/scala/kafka/server/Server.scala ## @@ -46,6 +46,22 @@ object Server { new Metrics(metricConfig, reporters, time, true, metricsContext) } + def initializeMetrics( +config: KafkaConfig, +time: Time, +metaProps: MetaProperties Review comment: It seems to me the properties in ```MetaProperties``` is duplicate to ```KafkaConfig``` in this case. Is there any reason that we need to pass ```MetaProperties```? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565785613 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { +addStreamStateChangeListener(kafkaStreams); +startStreamsAndWaitForRunning(kafkaStreams); + +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); +stateTransitionHistory.clear(); +assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION))); Review comment: I don't either... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: H. That is interesting. I am not sure. If the thread hasn't been removed then we don't want to resize the cache. The timeout is essentially saying that removing the thread failed. So is it right to then remove it anyways? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: H. That is interesting. I am not sure. If the thread hasn't been removed then we don't want to resize the cache so would removing the thread then throwing an exception the right way of doing it as the timeout is essentially saying that removing the thread failed. So is it right to then remove it anyways? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565783921 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1843,6 +1843,8 @@ class KafkaApis(val requestChannel: RequestChannel, .setNumPartitions(-1) .setReplicationFactor(-1) .setTopicConfigErrorCode(Errors.NONE.code) + } else { + result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(), Uuid.ZERO_UUID)) Review comment: I've added something like this to ZkAdminManager. Let me know if it makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
hachikuji commented on pull request #9589: URL: https://github.com/apache/kafka/pull/9589#issuecomment-768756102 @twobeeb Before I merge, would you mind updating the PR description? Also, I will leave it to you to add the doc suggestion from @skaundinya15. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.
gardnervickers commented on a change in pull request #9980: URL: https://github.com/apache/kafka/pull/9980#discussion_r565781027 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -63,7 +63,7 @@ private[log] object ProducerStateEntry { private[log] val NumBatchesToRetain = 5 def empty(producerId: Long) = new ProducerStateEntry(producerId, -batchMetadata = mutable.Queue[BatchMetadata](), +batchMetadata = new mutable.Queue[BatchMetadata](5), Review comment: Yes, good suggestion :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
skaundinya15 commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r565772802 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ## @@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) { public List clusterPairs() { List pairs = new ArrayList<>(); Set clusters = clusters(); +Map originalStrings = originalsStrings(); +boolean globalHeartbeatsEnabled = MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT; +if (originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +globalHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + for (String source : clusters) { for (String target : clusters) { -SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); if (!source.equals(target)) { -pairs.add(sourceAndTarget); +String clusterPairConfigPrefix = source + "->" + target + "."; +boolean clusterPairEnabled = Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + "enabled", "false")); +boolean clusterPairHeartbeatsEnabled = globalHeartbeatsEnabled; +if (originalStrings.containsKey(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +clusterPairHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + +if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { Review comment: ```suggestion // By default, all source->target Herder combinations are created even if `x->y.enabled=false` // Unless `emit.heartbeats.enabled=false` or `x->y.emit.heartbeats.enabled=false` // Reason for this behavior: for a given replication flow A->B with heartbeats, 2 herders are required : // B->A for the MirrorHeartbeatConnector (emits heartbeats into A for monitoring replication health) // A->B for the MirrorSourceConnector (actual replication flow) if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { ``` Looks good to me, just had a small tweak for the `B->A` comment. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500
hachikuji commented on pull request #9967: URL: https://github.com/apache/kafka/pull/9967#issuecomment-768742967 @chia7712 @ijuma Thanks for the comments thus far. This is ready for another look when you have time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface
[ https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-3745: -- Assignee: (was: Bill Bejeck) > Consider adding join key to ValueJoiner interface > - > > Key: KAFKA-3745 > URL: https://issues.apache.org/jira/browse/KAFKA-3745 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Greg Fodor >Priority: Minor > Labels: api, kip > > In working with Kafka Stream joining, it's sometimes the case that a join key > is not actually present in the values of the joins themselves (if, for > example, a previous transform generated an ephemeral join key.) In such > cases, the actual key of the join is not available in the ValueJoiner > implementation to be used to construct the final joined value. This can be > worked around by explicitly threading the join key into the value if needed, > but it seems like extending the interface to pass the join key along as well > would be helpful. > KIP-149: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings
mjsax commented on pull request #9420: URL: https://github.com/apache/kafka/pull/9420#issuecomment-768709672 @dongjinleekr -- the PR shows merge conflicts. Can you rebase once more. Sorry about that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273249#comment-17273249 ] Matthias J. Sax commented on KAFKA-6520: i am wondering if https://issues.apache.org/jira/browse/KAFKA-10866 (merge recently) is something we could exploit to implement a DISCONNECT state? The new metadata contains a `receivedTimestamp` field and thus we could track the time difference of "now" and the last received fetch response. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Vince Mu >Priority: Major > Labels: newbie, user-experience > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams] > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. This is a link to a related > issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12220) Replace PowerMock by Mockito
[ https://issues.apache.org/jira/browse/KAFKA-12220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273244#comment-17273244 ] Chia-Ping Tsai commented on KAFKA-12220: [~ijuma] How about splitting PR by package? ||package||classes|| |org.apache.kafka.connect.runtime.standalone|1| |org.apache.kafka.connect.runtime.distributed|3| |org.apache.kafka.connect.runtime.errors|2| |org.apache.kafka.connect.runtime.rest|3| |org.apache.kafka.connect.util|3| |org.apache.kafka.connect.storage|4| |org.apache.kafka.connect.runtime|9| > Replace PowerMock by Mockito > > > Key: KAFKA-12220 > URL: https://issues.apache.org/jira/browse/KAFKA-12220 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > We are migrating project from junit 4 to junit 5 (KAFKA-7339). PowerMock, > however, does not support junit 5 totally > (https://github.com/powermock/powermock/issues/830). Hence, we ought to > replace PowerMock by Mockito before migrating to junit 5 since rewriting all > tests which are depending on PowerMock can bring a bunch of changes. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration
mjsax commented on pull request #9708: URL: https://github.com/apache/kafka/pull/9708#issuecomment-768685944 Thanks @lct45! For reference: https://github.com/apache/kafka/pull/9951 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #9981: MINOR: Upgrade to Scala 2.12.13
chia7712 merged pull request #9981: URL: https://github.com/apache/kafka/pull/9981 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13
chia7712 commented on pull request #9981: URL: https://github.com/apache/kafka/pull/9981#issuecomment-768681359 build pass This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts
ableegoldman commented on a change in pull request #9978: URL: https://github.com/apache/kafka/pull/9978#discussion_r565746851 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -782,8 +783,27 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, final Time time) throws StreamsException { this.config = config; this.time = time; + +this.internalTopologyBuilder = internalTopologyBuilder; +internalTopologyBuilder.rewriteTopology(config); + +// sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception +taskTopology = internalTopologyBuilder.buildTopology(); +globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); + +final boolean hasGlobalTopology = globalTaskTopology != null; +final boolean hasPersistentStores = taskTopology.hasPersistentLocalStore() || +(hasGlobalTopology && globalTaskTopology.hasPersistentGlobalStore()); + +try { +stateDirectory = new StateDirectory(config, time, hasPersistentStores); +processId = stateDirectory.getProcessId(); Review comment: This is the only real change in the constructor, but I had to move a few things around and tried to organize them as I went This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565741567 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } Review comment: One last thing, can you add a version of the `shouldRemoveStreamThread()` test that uses static membership? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565740953 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +if (groupInstanceID.isPresent()) { Review comment: Yeah I think that makes sense here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565731661 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -319,9 +319,9 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin StreamThread.State.PARTITIONS_ASSIGNED); return null; }).anyTimes(); + EasyMock.expect(thread.getGroupInstanceID()).andReturn(Optional.empty()).anyTimes(); Review comment: ```suggestion EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty()); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { +try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) { +addStreamStateChangeListener(kafkaStreams); +startStreamsAndWaitForRunning(kafkaStreams); + +final int oldThreadCount = kafkaStreams.localThreadsMetadata().size(); +stateTransitionHistory.clear(); +assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION))); Review comment: It's a bit weird to test this by passing in a negative timeout but I don't have any good ideas for forcing it to exceed the timeout ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java ## @@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception { } } +@Test +public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception { Review comment: ```suggestion public void shouldNotRemoveStreamThreadWithinTimeout() throws Exception { ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); Review comment: Hm actually now that I think about it, we should probably continue with the cleanup to leave the
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565734101 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +if (streamThread.getGroupInstanceID().isPresent()) { +final MemberToRemove memberToRemove = new MemberToRemove(streamThread.getGroupInstanceID().get()); +final Collection membersToRemove = Collections.singletonList(memberToRemove); +final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove)); +try { + removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs - begin, TimeUnit.MILLISECONDS); +} catch (final java.util.concurrent.TimeoutException e) { Review comment: We should. And I think maybe we should log the original stack trace This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565732778 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +return removeStreamThread(timeoutMs); +} + +private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { +final long begin = time.milliseconds(); if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { // make a copy of threads to avoid holding lock for (final StreamThread streamThread : new ArrayList<>(threads)) { if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) || threads.size() == 1)) { +final Optional groupInstanceID = streamThread.getGroupInstanceID(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +if (groupInstanceID.isPresent()) { Review comment: Ok, so we just do something like if (groupInstanceID.isPresent() && !streamThread.getName().equals(Thread.currentThread().getName()) when deciding whether to remove it from the group? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
vvcephei merged pull request #9836: URL: https://github.com/apache/kafka/pull/9836 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
vvcephei commented on pull request #9836: URL: https://github.com/apache/kafka/pull/9836#issuecomment-768664736 Flaky test failures: ``` Build / JDK 11 / org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate() Build / JDK 11 / org.apache.kafka.clients.producer.KafkaProducerTest.testHeadersWithExtendedClasses() Build / JDK 15 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete() ``` The most concerning one is the FetcherTest, but it's also failing on trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.
satishd commented on a change in pull request #9980: URL: https://github.com/apache/kafka/pull/9980#discussion_r565730154 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -63,7 +63,7 @@ private[log] object ProducerStateEntry { private[log] val NumBatchesToRetain = 5 def empty(producerId: Long) = new ProducerStateEntry(producerId, -batchMetadata = mutable.Queue[BatchMetadata](), +batchMetadata = new mutable.Queue[BatchMetadata](5), Review comment: minor: you may want to have it as `new mutable.Queue[BatchMetadata](NumBatchesToRetain)` instead of harcoding directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565720574 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { Review comment: We generally don't explicitly make this part of the API, and just inform users through the javadocs as you've done ```suggestion public Optional removeStreamThread(final Duration timeout) { ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin * no stream threads are alive */ public Optional removeStreamThread() { +return removeStreamThread(Long.MAX_VALUE); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time Review comment: ```suggestion * @throws org.apache.kafka.common.errors.TimeoutException if the thread does not stop in time ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -88,9 +91,11 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.errors.TimeoutException; Review comment: nit: move the import to the other `o.a.k.*` imports ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +if (streamThread.getGroupInstanceID().isPresent()) { +final MemberToRemove memberToRemove = new MemberToRemove(streamThread.getGroupInstanceID().get()); +final Collection membersToRemove = Collections.singletonList(memberToRemove); +final RemoveMembersFromConsumerGroupResult
[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join
[ https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273206#comment-17273206 ] Matthias J. Sax commented on KAFKA-10847: - Thanks [~spena] – overall using a second store might be the simples solution and if we can get some perf results we can make a better decision if the performance is acceptable or not. The only thing I tend to object is the usage of _wall-clock_ time punctuation, because it would introduce non-determinism. And if we use stream-time punctuations, we could even avoid punctuations at all, an "piggy-back" emitting left/outer join result, each time we process an input record. Also take into account the grace period, ie, we should only emit left/outer join result after a window closes (not when a window end): window close = window end + grace period. > Avoid spurious left/outer join results in stream-stream join > - > > Key: KAFKA-10847 > URL: https://issues.apache.org/jira/browse/KAFKA-10847 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sergio Peña >Priority: Major > > KafkaStreams follows an eager execution model, ie, it never buffers input > records but processes them right away. For left/outer stream-stream join, > this implies that left/outer join result might be emitted before the window > end (or window close) time is reached. Thus, a record what will be an > inner-join result, might produce a eager (and spurious) left/outer join > result. > We should change the implementation of the join, to not emit eager left/outer > join result, but instead delay the emission of such result after the window > grace period passed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565722046 ## File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala ## @@ -139,6 +145,25 @@ class KafkaNetworkChannelTest { } } + @Test + def testNonRoutableAddressUpdateRequest(): Unit = { +val destinationId = 2 +assertThrows(classOf[IllegalArgumentException], + () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0))) Review comment: Can we move this to `RaftConfigTest`? It's not really part of the behavior of `KafkaNetworkChannel`. ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,7 +87,48 @@ private final int electionBackoffMaxMs; private final int fetchTimeoutMs; private final int appendLingerMs; -private final Map voterConnections; +private final Map voterConnections; + +public static abstract class AddressSpec { + public abstract InetSocketAddress address(); Review comment: Do we need this in the abstract class? I was thinking we would only be able to access `InetSocketAddress` if the type is `InetAddressSpec`. Otherwise the type protection from `AddressSpec` loses its bite. ## File path: core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala ## @@ -139,6 +145,25 @@ class KafkaNetworkChannelTest { } } + @Test + def testNonRoutableAddressUpdateRequest(): Unit = { +val destinationId = 2 +assertThrows(classOf[IllegalArgumentException], + () => new InetAddressSpec(new InetSocketAddress("0.0.0.0", 0))) + +// Update channel with a valid endpoint Review comment: Not sure there's much value in the rest of this test. Seems effectively the same as `testSendAndReceiveOutboundRequest`. ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -76,7 +87,48 @@ private final int electionBackoffMaxMs; private final int fetchTimeoutMs; private final int appendLingerMs; -private final Map voterConnections; +private final Map voterConnections; + +public static abstract class AddressSpec { + public abstract InetSocketAddress address(); + +@Override +public boolean equals(Object obj) { +if (this == obj) { +return true; +} + +if (obj == null || getClass() != obj.getClass()) { +return false; +} + +final AddressSpec that = (AddressSpec) obj; +return that.address().equals(address()); +} +} + +public static class InetAddressSpec extends AddressSpec { +private final InetSocketAddress address; + +public InetAddressSpec(InetSocketAddress address) { +if (address.equals(UNROUTABLE_ADDRESS)) { +throw new IllegalArgumentException("Address not routable"); +} +this.address = address; +} + +@Override +public InetSocketAddress address() { +return address; +} +} + +public static class UnknownAddressSpec extends AddressSpec { Review comment: A common pattern for classes like this without any state is to create a static instance. ```java public static final UnknownAddressSpec INSTANCE = new UnknownAddressSpec(); ``` ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -118,9 +119,20 @@ class KafkaRaftManager[T]( private val raftIoThread = new RaftIoThread(raftClient) def startup(): Unit = { +// Update the voter endpoints (if valid) with what's in RaftConfig +val voterAddresses: util.Map[Integer, AddressSpec] = raftConfig.quorumVoterConnections +for (voterAddressEntry <- voterAddresses.entrySet.asScala) { + voterAddressEntry.getValue match { +case spec: InetAddressSpec => { + netChannel.updateEndpoint(voterAddressEntry.getKey, spec) +} +case invalid: AddressSpec => { + logger.warn(s"Skipping channel update for destination ID: ${voterAddressEntry.getKey} " + Review comment: This could be `info` I think in the case of `UnknownAddressSpec`. It is expected behavior to skip the update. We could add a third case for unexpected `AddressSpec` types. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
twobeeb commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ## @@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) { public List clusterPairs() { List pairs = new ArrayList<>(); Set clusters = clusters(); +Map originalStrings = originalsStrings(); +boolean globalHeartbeatsEnabled = MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT; +if (originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +globalHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + for (String source : clusters) { for (String target : clusters) { -SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); if (!source.equals(target)) { -pairs.add(sourceAndTarget); +String clusterPairConfigPrefix = source + "->" + target + "."; +boolean clusterPairEnabled = Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + "enabled", "false")); +boolean clusterPairHeartbeatsEnabled = globalHeartbeatsEnabled; +if (originalStrings.containsKey(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +clusterPairHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + +if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { Review comment: Thanks for your review @skaundinya15. I'm having a hard time phrasing this properly, suggestions would be welcome. Is this comment proposition aligned with what you had in mind ? ```suggestion // By default, all source->target Herder combinations are created even if `x->y.enabled=false` // Unless `emit.heartbeats.enabled=false` or `x->y.emit.heartbeats.enabled=false` // Reason for this behavior: for a given replication flow A->B with heartbeats, 2 herders are required : // B->A for the MirrorHeartbeatConnector (emits heartbeats into A) // A->B for the MirrorSourceConnector (actual replication flow) if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] twobeeb commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
twobeeb commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ## @@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) { public List clusterPairs() { List pairs = new ArrayList<>(); Set clusters = clusters(); +Map originalStrings = originalsStrings(); +boolean globalHeartbeatsEnabled = MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT; +if (originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +globalHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + for (String source : clusters) { for (String target : clusters) { -SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); if (!source.equals(target)) { -pairs.add(sourceAndTarget); +String clusterPairConfigPrefix = source + "->" + target + "."; +boolean clusterPairEnabled = Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + "enabled", "false")); +boolean clusterPairHeartbeatsEnabled = globalHeartbeatsEnabled; +if (originalStrings.containsKey(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +clusterPairHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + +if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { Review comment: @skaundinya15 Thanks for your review. I'm having a hard time phrasing this properly, suggestions would be welcome. Is this comment proposition aligned with what you had in mind ? ```suggestion // By default, all source->target Herder combinations are created even if `x->y.enabled=false` // Unless `emit.heartbeats.enabled=false` or `x->y.emit.heartbeats.enabled=false` // Reason for this behavior: for a given replication flow A->B with heartbeats, 2 herders are required : // B->A for the MirrorHeartbeatConnector (emits heartbeats into A) // A->B for the MirrorSourceConnector (actual replication flow) if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a change in pull request #9987: KAFKA-10895: Gracefully handle invalid JAAS configs
gharris1727 commented on a change in pull request #9987: URL: https://github.com/apache/kafka/pull/9987#discussion_r565694653 ## File path: connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java ## @@ -80,7 +95,8 @@ public void close() throws IOException { @Override public void configure(Map configs) { - +// If we failed to retrieve a JAAS configuration during startup, throw that exception now +CONFIGURATION.get(); Review comment: Could you add a test which confirms that we're propagating the exception here? At the moment, the test verifies that the wrapping method works, but doesn't verify that it's used by the rest extension during the loading phase. ## File path: connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java ## @@ -63,4 +69,25 @@ public void testJaasConfigurationNotOverwritten() { assertNotEquals(overwrittenConfiguration, jaasFilter.getValue().configuration, "Overwritten JAAS configuration should not be used by basic auth REST extension"); } + +@Test +public void testBadJaasConfiguration() { +SecurityException jaasConfigurationException = new SecurityException(new IOException("Bad JAAS config is bad")); +Supplier configuration = BasicAuthSecurityRestExtension.initializeConfiguration(() -> { +throw jaasConfigurationException; +}); + +ConnectException thrownException = assertThrows(ConnectException.class, configuration::get); +assertEquals(jaasConfigurationException, thrownException.getCause()); +} + +@Test +public void testGoodJaasConfiguration() { +Configuration mockConfiguration = EasyMock.mock(Configuration.class); Review comment: The identity function could pass this test, but wouldn't have the behavior we need in the BasicAuthSecurityRestExtension. I wonder if there's a way to confirm that the mockConfiguration has been evaluated prior to calling `get()` on the returned supplier. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller
junrao commented on a change in pull request #9901: URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645 ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.timeline; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * SnapshottableHashTable implements a hash table that supports creating point-in-time + * snapshots. Each snapshot is immutable once it is created; the past cannot be changed. + * We handle divergences between the current state and historical state by copying a + * reference to elements that have been deleted or overwritten into the snapshot tiers + * in which they still exist. Each tier has its own hash table. + * + * In order to retrieve an object from epoch E, we only have to check two tiers: the + * current tier, and the tier associated with the snapshot from epoch E. This design + * makes snapshot reads a little faster and simpler, at the cost of requiring us to copy + * references into multiple snapshot tiers sometimes when altering the current state. + * In general, we don't expect there to be many snapshots at any given point in time, + * though. We expect to use about 2 snapshots at most. + * + * The current tier's data is stored in the fields inherited from BaseHashTable. It + * would be conceptually simpler to have a separate BaseHashTable object, but since Java + * doesn't have value types, subclassing is the only way to avoid another pointer + * indirection and the associated extra memory cost. + * + * In contrast, the data for snapshot tiers is stored in the Snapshot object itself. + * We access it by looking up our object reference in the Snapshot's IdentityHashMap. + * This design ensures that we can remove snapshots in O(1) time, simply by deleting the + * Snapshot object from the SnapshotRegistry. + * + * As mentioned before, an element only exists in a snapshot tier if the element was + * overwritten or removed from a later tier. If there are no changes between then and + * now, there is no data at all stored for the tier. We don't even store a hash table + * object for a tier unless there is at least one change between then and now. + * + * The class hierarchy looks like this: + * + *Revertable BaseHashTable + * ↑ ↑ + * SnapshottableHashTable → SnapshotRegistry → Snapshot + * ↑ ↑ + * TimelineHashSet TimelineHashMap + * + * BaseHashTable is a simple hash table that uses separate chaining. The interface is + * pretty bare-bones since this class is not intended to be used directly by end-users. + * + * This class, SnapshottableHashTable, has the logic for snapshotting and iterating over + * snapshots. This is the core of the snapshotted hash table code and handles the + * tiering. + * + * TimelineHashSet and TimelineHashMap are mostly wrappers around this + * SnapshottableHashTable class. They implement standard Java APIs for Set and Map, + * respectively. There's a fair amount of boilerplate for this, but it's necessary so + * that timeline data structures can be used while writing idiomatic Java code. + * The accessor APIs have two versions -- one that looks at the current state, and one + * that looks at a historical snapshotted state. Mutation APIs only ever mutate thte Review comment: typo thte ## File path: metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java ## @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by
[GitHub] [kafka] C0urante opened a new pull request #9987: KAFKA-10895: Gracefully handle invalid JAAS configs
C0urante opened a new pull request #9987: URL: https://github.com/apache/kafka/pull/9987 Follow-up to https://github.com/apache/kafka/pull/9806 If an invalid JAAS config is present on the worker, invoking `Configuration::getConfiguration` throws an exception. The changes from #9806 cause that exception to be thrown during plugin scanning, which causes the worker to fail even if it is not configured to use the basic auth extension at all. This follow-up handles invalid JAAS configurations more gracefully, and only throws them if the worker is actually configured to use the basic auth extension, at the time that the extension is instantiated and configured. Two unit tests are added to test the green-path and red-path behavior of the extension when it encounters well-formed and ill-formed JAAS configurations, respectively. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565673883 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -34,6 +34,7 @@ import scala.collection.mutable object KafkaNetworkChannel { + val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0) Review comment: Yea, good catch. The AddressSpec makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565673491 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -36,7 +36,9 @@ public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters"; public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " + "the set of voters in a comma-separated list of `{id}@{host}:{port}` entries. " + -"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`"; +"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" + +"If the voter endpoints are not known at startup, a non-routable address can be provided instead." + Review comment: Fair enough. I can move it there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565673305 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -208,8 +209,9 @@ public KafkaRaftClient( int fetchMaxWaitMs, OptionalInt nodeId, LogContext logContext, -Random random -) { +Random random, +RaftConfig raftConfig +) throws IOException { Review comment: Yea. This is an artifact from the `quorumState.initialize` change. Since that's moved down to the `client.initialize`, we can remove it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565672756 ## File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java ## @@ -25,20 +25,25 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; public class MockNetworkChannel implements NetworkChannel { private final AtomicInteger correlationIdCounter; +private final Map addressCache; Review comment: Ack. I considered it. Figured it might be useful to have the endpoints for any future tests. Looks like the MockNetworkChannel doesn't test anything endpoint specific. Will remove This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #9986: JUnit extensions for integration tests
mumrah opened a new pull request #9986: URL: https://github.com/apache/kafka/pull/9986 TBD This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling
vvcephei commented on pull request #9840: URL: https://github.com/apache/kafka/pull/9840#issuecomment-768599882 There was a merge conflict with trunk. Rebased and pushed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true
skaundinya15 commented on a change in pull request #9589: URL: https://github.com/apache/kafka/pull/9589#discussion_r565656131 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java ## @@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) { public List clusterPairs() { List pairs = new ArrayList<>(); Set clusters = clusters(); +Map originalStrings = originalsStrings(); +boolean globalHeartbeatsEnabled = MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT; +if (originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +globalHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + for (String source : clusters) { for (String target : clusters) { -SourceAndTarget sourceAndTarget = new SourceAndTarget(source, target); if (!source.equals(target)) { -pairs.add(sourceAndTarget); +String clusterPairConfigPrefix = source + "->" + target + "."; +boolean clusterPairEnabled = Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + "enabled", "false")); +boolean clusterPairHeartbeatsEnabled = globalHeartbeatsEnabled; +if (originalStrings.containsKey(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) { +clusterPairHeartbeatsEnabled = Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)); +} + +if (clusterPairEnabled || clusterPairHeartbeatsEnabled) { Review comment: Thanks for the explanation @twobeeb, this makes sense. It would be good to add some comments explaining this in the code as this isn't immediately obvious. Other than that it looks good to me overall. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol
[ https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273158#comment-17273158 ] A. Sophie Blee-Goldman commented on KAFKA-12169: hey [~boyang] any thoughts? > Consumer can not know paritions chage when client leader restart with static > membership protocol > > > Key: KAFKA-12169 > URL: https://issues.apache.org/jira/browse/KAFKA-12169 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.5.1, 2.6.1 >Reporter: zou shengfu >Priority: Major > > Background: > Kafka consumer services run with static membership and cooperative rebalance > protocol on kubernetes, and services often restart because of operation. When > we added partitions from 1000 to 2000 for the topic, client leader restart > with unknown member id at the same time, we found the consumers do not > tigger rebalance and still consume 1000 paritions > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565643848 ## File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java ## @@ -25,20 +25,25 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; public class MockNetworkChannel implements NetworkChannel { private final AtomicInteger correlationIdCounter; +private final Map addressCache; Review comment: On second thought, it seems worth keeping this as a set. It helps us ensure that no requests are sent to non-voters. I would just change the field name to `Set voterIds`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
hachikuji commented on a change in pull request #9985: URL: https://github.com/apache/kafka/pull/9985#discussion_r565634630 ## File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala ## @@ -34,6 +34,7 @@ import scala.collection.mutable object KafkaNetworkChannel { + val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0) Review comment: Let me suggest an alternative for the sake of argument. Currently, `RaftConfig.parseVoterConnections` return `Map`. This works for the case we're interested in, but there is a risk of our sentinel non-routable address leaking into unexpected cases (a common source of bugs in Kafkaland). Alternatively, what if we add something like this to `RaftConfig`: ```java public class RaftConfig { ... public Map quorumVoterConnections(); public static interface AddressSpec { } public static class InetAddressSpec implements AddressSpec { final InetSocketAddress address; } public static class UnknownAddressSpec implements AddressSpec { } } ``` The advantage is that this lets the type checker help us ensure that we are checking for a sentinel. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -208,8 +209,9 @@ public KafkaRaftClient( int fetchMaxWaitMs, OptionalInt nodeId, LogContext logContext, -Random random -) { +Random random, +RaftConfig raftConfig +) throws IOException { Review comment: Is anything in here throwing `IOException`? ## File path: raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java ## @@ -25,20 +25,25 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; public class MockNetworkChannel implements NetworkChannel { private final AtomicInteger correlationIdCounter; +private final Map addressCache; Review comment: I don't think we are using the address here. Can we use `Set`? Potentially we could even get rid of this collection. It was more useful when the RaftClient itself was expected to discover the voter endpoints. ## File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java ## @@ -36,7 +36,9 @@ public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters"; public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " + "the set of voters in a comma-separated list of `{id}@{host}:{port}` entries. " + -"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`"; +"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" + +"If the voter endpoints are not known at startup, a non-routable address can be provided instead." + Review comment: Perhaps we can keep this as an internal feature for now. It is not something that a user would be able to leverage. We can document it in the class javadoc perhaps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565640687 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +if (streamThread.getGroupInstanceID().isPresent()) { +final MemberToRemove memberToRemove = new MemberToRemove(streamThread.getGroupInstanceID().get()); +final Collection membersToRemove = Collections.singletonList(memberToRemove); +final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove)); +try { + removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs - begin, TimeUnit.MILLISECONDS); +} catch (final java.util.concurrent.TimeoutException e) { Review comment: have to make this a kafkaTimeout This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565635667 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); + streamThread.waitOnThreadState(StreamThread.State.DEAD, -1); } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +final Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); + adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove)); +return Optional.of(streamThread.getName()); +} +} +} +log.warn("There are no threads eligible for removal"); +} else { +log.warn("Cannot remove a stream thread when Kafka Streams client is in state " + state()); +} +return Optional.empty(); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +if (isRunningOrRebalancing()) { +synchronized (changeThreadCount) { +// make a copy of threads to avoid holding lock +for (final StreamThread streamThread : new ArrayList<>(threads)) { +if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) +|| threads.size() == 1)) { +streamThread.shutdown(); +if (!streamThread.getName().equals(Thread.currentThread().getName())) { +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} +} +threads.remove(streamThread); +final long cacheSizePerThread = getCacheSizePerThread(threads.size()); +resizeThreadCache(cacheSizePerThread); +Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); + adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove)); Review comment: Sounds good. How should you handle the `ExecutionException`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration
[ https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-10716: --- Priority: Critical (was: Major) > Streams processId is unstable across restarts resulting in task mass migration > -- > > Key: KAFKA-10716 > URL: https://issues.apache.org/jira/browse/KAFKA-10716 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Critical > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > The new high availability feature of KIP-441 relies on deterministic > assignment to produce an eventually-stable assignment. The > HighAvailabilityTaskAssignor assigns tasks based on the unique processId > assigned to each client, so if the same set of Kafka Streams applications > participate in a rebalance it should generate the same task assignment every > time. > Unfortunately the processIds aren't stable across restarts. We generate a > random UUID in the KafkaStreams constructor, so each time the process starts > up it would be assigned a completely different processId. Unless this new > processId happens to be in exactly the same order as the previous one, a > single bounce or crash/restart can result in a large scale shuffling of tasks > based on a completely different eventual assignment. > Ultimately we should fix this via KAFKA-10121, but that's a nontrivial > undertaking and this bug merits some immediate relief if we don't intend to > tackle the larger problem in the upcoming releases -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10716) Streams processId is unstable across restarts resulting in task mass migration
[ https://issues.apache.org/jira/browse/KAFKA-10716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-10716: --- Fix Version/s: 2.6.2 > Streams processId is unstable across restarts resulting in task mass migration > -- > > Key: KAFKA-10716 > URL: https://issues.apache.org/jira/browse/KAFKA-10716 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > The new high availability feature of KIP-441 relies on deterministic > assignment to produce an eventually-stable assignment. The > HighAvailabilityTaskAssignor assigns tasks based on the unique processId > assigned to each client, so if the same set of Kafka Streams applications > participate in a rebalance it should generate the same task assignment every > time. > Unfortunately the processIds aren't stable across restarts. We generate a > random UUID in the KafkaStreams constructor, so each time the process starts > up it would be assigned a completely different processId. Unless this new > processId happens to be in exactly the same order as the previous one, a > single bounce or crash/restart can result in a large scale shuffling of tasks > based on a completely different eventual assignment. > Ultimately we should fix this via KAFKA-10121, but that's a nontrivial > undertaking and this bug merits some immediate relief if we don't intend to > tackle the larger problem in the upcoming releases -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565628682 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); + streamThread.waitOnThreadState(StreamThread.State.DEAD, -1); } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +final Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); + adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove)); +return Optional.of(streamThread.getName()); +} +} +} +log.warn("There are no threads eligible for removal"); +} else { +log.warn("Cannot remove a stream thread when Kafka Streams client is in state " + state()); +} +return Optional.empty(); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +if (isRunningOrRebalancing()) { +synchronized (changeThreadCount) { +// make a copy of threads to avoid holding lock +for (final StreamThread streamThread : new ArrayList<>(threads)) { +if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) +|| threads.size() == 1)) { +streamThread.shutdown(); +if (!streamThread.getName().equals(Thread.currentThread().getName())) { +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} +} +threads.remove(streamThread); +final long cacheSizePerThread = getCacheSizePerThread(threads.size()); +resizeThreadCache(cacheSizePerThread); +Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); Review comment: that works This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aloknnikhil opened a new pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses
aloknnikhil opened a new pull request #9985: URL: https://github.com/apache/kafka/pull/9985 With KIP-595, we expect the RaftConfig to specify the quorum voter endpoints upfront on startup. In the general case, this works fine. However, for testing we need a more lazy approach that discovers the other voters in the quorum after startup (i.e. controller port bind). This approach also lends itself well to cases where we might have an observer that discovers the voter endpoints from, say a `DescribeQuorum` event. ### Committer Checklist (excluded from commit message) - [x] Verify design and implementation - [x] Verify test coverage and CI build status - [x] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565628241 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); + streamThread.waitOnThreadState(StreamThread.State.DEAD, -1); Review comment: good idea, I reworked that a bit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565622524 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; } -public void waitOnThreadState(final StreamThread.State targetState) { +public boolean waitOnThreadState(final StreamThread.State targetState, long timeoutMs) { +if (timeoutMs < 0) { +timeoutMs = 0; +} else if (timeoutMs == 0) { +timeoutMs = Long.MAX_VALUE; Review comment: ah yeah, I had to fix this when I was writing my test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565622163 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -91,6 +93,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; Review comment: I did not know that. good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565621384 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -1147,6 +1162,10 @@ public String toString(final String indent) { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } +public String getGroupInstanceID() { Review comment: Either way we need to deal with it. I thought it would be easier to just do it once. But It probably better practice to handle it later. I will change it to Optional This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9738: KAFKA-8744: Update Scala API to give names to processors
bbejeck commented on pull request #9738: URL: https://github.com/apache/kafka/pull/9738#issuecomment-768565772 Java 11 and 15 passed Java 8 failures unrelated ``` org.apache.kafka.clients.consumer.KafkaConsumerTest.testCloseWithTimeUnit() org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate() kafka.api.TransactionsBounceTest.testWithGroupMetadata() ``` Kicking off tests again to try and get a green build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
ableegoldman commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565611900 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -91,6 +93,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; Review comment: There's actually a kafka-specific version of `TimeoutException` that you should use to keep in line with other kafka APIs. It's `org.apache.kafka.common.errors.TimeoutException` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1008,60 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); + streamThread.waitOnThreadState(StreamThread.State.DEAD, -1); } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +final Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); + adminClient.removeMembersFromConsumerGroup(config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove)); +return Optional.of(streamThread.getName()); +} +} +} +log.warn("There are no threads eligible for removal"); +} else { +log.warn("Cannot remove a stream thread when Kafka Streams client is in state " + state()); +} +return Optional.empty(); +} + +/** + * Removes one stream thread out of the running stream threads from this Kafka Streams client. + * + * The removed stream thread is gracefully shut down. This method does not specify which stream + * thread is shut down. + * + * Since the number of stream threads decreases, the sizes of the caches in the remaining stream + * threads are adapted so that the sum of the cache sizes over all stream threads equals the total + * cache size specified in configuration {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}. + * + * @param timeout The the length of time to wait for the thread to shutdown + * @throws TimeoutException if the thread does not stop in time + * @return name of the removed stream thread or empty if a stream thread could not be removed because + * no stream threads are alive + */ +public Optional removeStreamThread(final Duration timeout) throws TimeoutException { +final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout"); +final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix); +if (isRunningOrRebalancing()) { +synchronized (changeThreadCount) { +// make a copy of threads to avoid holding lock +for (final StreamThread streamThread : new ArrayList<>(threads)) { +if (streamThread.isAlive() && (!streamThread.getName().equals(Thread.currentThread().getName()) +|| threads.size() == 1)) { +streamThread.shutdown(); +if (!streamThread.getName().equals(Thread.currentThread().getName())) { +if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { +log.warn("Thread " + streamThread.getName() + " did not stop in the allotted time"); +throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time"); +} +} +threads.remove(streamThread); +final long cacheSizePerThread = getCacheSizePerThread(threads.size()); +resizeThreadCache(cacheSizePerThread); +Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); Review comment: I'm not sure how `removeMembersFromConsumerGroup` would behave if you passed in `""` as the `group.instance.id`, do you know? If not then let's just be safe and check what `streamThread.getGroupInstanceID()` returns, and skip this call if there is no group.instance.id (ie if not static) ## File path:
[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
cadonna commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565616503 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -1147,6 +1162,10 @@ public String toString(final String indent) { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } +public String getGroupInstanceID(){ +return mainConsumer.groupMetadata().groupInstanceId().orElse(""); Review comment: I would do it the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
cadonna commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565615976 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -1147,6 +1162,10 @@ public String toString(final String indent) { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } +public String getGroupInstanceID() { Review comment: Why not an `Optional`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs
rajinisivaram commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r565615638 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponseTopic] else unauthorizedForDescribeTopics.map(topic => - metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, false, util.Collections.emptyList())) + metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Uuid.ZERO_UUID, false, util.Collections.emptyList())) Review comment: Good point. If not authorized for describe when using topic ids, we need to make sure we don't return the topic or information about existence of a topic - i.e we can't return TOPIC_AUTHORIZATION_FAILED. Perhaps UNKNOWN_TOPIC_ID would be more suitable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9944: KAFKA-10580: Add topic ID support to Fetch request
jolshan commented on a change in pull request #9944: URL: https://github.com/apache/kafka/pull/9944#discussion_r565612146 ## File path: clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java ## @@ -186,23 +241,37 @@ public String toString() { * incremental fetch requests (see below). */ private LinkedHashMap next; +private Map topicIds; +private Map topicNames; +private Map partitionsPerTopic; private final boolean copySessionPartitions; Builder() { this.next = new LinkedHashMap<>(); +this.topicIds = new HashMap<>(); +this.topicNames = new HashMap<>(); +this.partitionsPerTopic = new HashMap<>(); this.copySessionPartitions = true; } Builder(int initialSize, boolean copySessionPartitions) { this.next = new LinkedHashMap<>(initialSize); +this.topicIds = new HashMap<>(initialSize); +this.topicNames = new HashMap<>(initialSize); +this.partitionsPerTopic = new HashMap<>(initialSize); this.copySessionPartitions = copySessionPartitions; } /** * Mark that we want data from this partition in the upcoming fetch. */ -public void add(TopicPartition topicPartition, PartitionData data) { -next.put(topicPartition, data); +public void add(TopicPartition topicPartition, Uuid id, PartitionData data) { +if (next.put(topicPartition, data) == null) +partitionsPerTopic.merge(topicPartition.topic(), 1, (prev, next) -> prev + next); Review comment: I think I may want to do this in a simpler way. I want to keep track if we have IDs for all the topics and I'm not sure if there is a better way to figure out when a topic is no longer in a session besides checking all the topic partitions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 commented on a change in pull request #9984: URL: https://github.com/apache/kafka/pull/9984#discussion_r565607533 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final java.util.function.Consumer this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; } -public void waitOnThreadState(final StreamThread.State targetState) { +public boolean waitOnThreadState(final StreamThread.State targetState, long timeoutMs) { +if (timeoutMs < 0) { Review comment: for the non timeout uses ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -1147,6 +1162,10 @@ public String toString(final String indent) { return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent); } +public String getGroupInstanceID(){ +return mainConsumer.groupMetadata().groupInstanceId().orElse(""); Review comment: It seems easier to get it form here than the config. It looked like I might have how to manipulate strings in that case ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1005,11 +1007,56 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin || threads.size() == 1)) { streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - streamThread.waitOnThreadState(StreamThread.State.DEAD); + streamThread.waitOnThreadState(StreamThread.State.DEAD, -1); } threads.remove(streamThread); final long cacheSizePerThread = getCacheSizePerThread(threads.size()); resizeThreadCache(cacheSizePerThread); +Collection membersToRemove = Collections.singletonList(new MemberToRemove(streamThread.getGroupInstanceID())); Review comment: I ended up getting the `group.instance.id` from the streamThread This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #9984: MINOR: add timeout and static group rebalance to remove thread
wcarlson5 opened a new pull request #9984: URL: https://github.com/apache/kafka/pull/9984 add timeout and static group rebalance to remove thread ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords
vvcephei commented on pull request #9836: URL: https://github.com/apache/kafka/pull/9836#issuecomment-768546193 Most of those failures were known flaky tests, but one was an EasyMock error. I'm not able to repro it locally after a rebase, though. Rebased, pushed, and trying one more time to get a clean build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => +val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() - .setName(topic)) + .setName(name) + .setTopicId(topic.topicId())) } val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => - if (!authorizedTopics.contains(topic.name)) + val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null + val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) Review comment: Thinking on this more, I can simplify the line. Especially if I make changes with the code above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete
jolshan commented on a change in pull request #9684: URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel, val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size) val toDelete = mutable.Set[String]() if (!controller.isActive) { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(Errors.NOT_CONTROLLER.code)) } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => results.add(new DeletableTopicResult() - .setName(topic) + .setName(topic.name()) + .setTopicId(topic.topicId()) .setErrorCode(error.code)) } sendResponseCallback(results) } else { - deleteTopicRequest.data.topicNames.forEach { topic => + deleteTopicRequest.topics().forEach { topic => +val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name() + else controller.controllerContext.topicNames.getOrElse(topic.topicId(), null) results.add(new DeletableTopicResult() - .setName(topic)) + .setName(name) + .setTopicId(topic.topicId())) } val authorizedTopics = authHelper.filterByAuthorized(request.context, DELETE, TOPIC, results.asScala)(_.name) results.forEach { topic => - if (!authorizedTopics.contains(topic.name)) + val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && topic.name() != null + val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID) Review comment: Thinking on this more, I can simplify the line. If topic name is null, then we didn't have a valid topic ID. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org