[GitHub] [kafka] xvrl opened a new pull request #9405: MINOR internal KIP-629 changes to methods and variables
xvrl opened a new pull request #9405: URL: https://github.com/apache/kafka/pull/9405 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl opened a new pull request #9404: KAFKA-10589 replica verification tool changes for KIP-629
xvrl opened a new pull request #9404: URL: https://github.com/apache/kafka/pull/9404 depends on #9400, ignore first commit still needs backwards compatibility changes This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10589) Rename kafka-replica-verification CLI command line arguments for KIP-629
[ https://issues.apache.org/jira/browse/KAFKA-10589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté reassigned KAFKA-10589: - Assignee: Xavier Léauté > Rename kafka-replica-verification CLI command line arguments for KIP-629 > > > Key: KAFKA-10589 > URL: https://issues.apache.org/jira/browse/KAFKA-10589 > Project: Kafka > Issue Type: Sub-task >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xvrl opened a new pull request #9403: KAFKA-10573 Update connect transforms configs for KIP-629
xvrl opened a new pull request #9403: URL: https://github.com/apache/kafka/pull/9403 depends on #9367 (KAFKA-10570) - ignore the first commit until the other PR is merged cc @rhauch for review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10588) Rename kafka-console-consumer CLI command line arguments for KIP-629
[ https://issues.apache.org/jira/browse/KAFKA-10588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté reassigned KAFKA-10588: - Assignee: Xavier Léauté > Rename kafka-console-consumer CLI command line arguments for KIP-629 > > > Key: KAFKA-10588 > URL: https://issues.apache.org/jira/browse/KAFKA-10588 > Project: Kafka > Issue Type: Sub-task >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xvrl opened a new pull request #9402: update console consumer arguments for KIP-629
xvrl opened a new pull request #9402: URL: https://github.com/apache/kafka/pull/9402 draft PR, more changes needed in order to ensure backwards compatibility This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx commented on pull request #9399: KAFKA-10584:IndexSearchType should use sealed trait instead of Enumeration
huxihx commented on pull request #9399: URL: https://github.com/apache/kafka/pull/9399#issuecomment-705990045 @junrao Please review this minor change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList
chia7712 commented on pull request #9400: URL: https://github.com/apache/kafka/pull/9400#issuecomment-705989392 > this was already discussed as part of KIP-629. The term "include" was chosen to align with existing configs we already had. Please see the KIP and the mailing list discussion Got it. thanks for the information! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl commented on pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList
xvrl commented on pull request #9400: URL: https://github.com/apache/kafka/pull/9400#issuecomment-705987912 @chia7712 this was already discussed as part of KIP-629. The term "include" was chosen to align with existing configs we already had. Please see the [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase) and [the mailing list discussion](https://lists.apache.org/thread.html/rbe19a71644c85c53de7ea5cfa00e4c90f530332f09758f24709b81f6%40%3Cdev.kafka.apache.org%3E) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9401: KAFKA-9628 Replace Produce request with automated protocol
chia7712 opened a new pull request #9401: URL: https://github.com/apache/kafka/pull/9401 issue: https://issues.apache.org/jira/browse/KAFKA-9628 this PR is a part of KAFKA-9628. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList
chia7712 commented on pull request #9400: URL: https://github.com/apache/kafka/pull/9400#issuecomment-705984805 Is ```allowList``` more suitable to replace ```whiteList```? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9628) Replace Produce request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-9628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210629#comment-17210629 ] Chia-Ping Tsai commented on KAFKA-9628: --- > I've reassigned it to you, go for it thanks! I will file two PR to address request/response individually. > Replace Produce request/response with automated protocol > > > Key: KAFKA-9628 > URL: https://issues.apache.org/jira/browse/KAFKA-9628 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Chia-Ping Tsai >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xvrl opened a new pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList
xvrl opened a new pull request #9400: URL: https://github.com/apache/kafka/pull/9400 rename internal classes and methods for KIP-629 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…
chia7712 commented on pull request #9318: URL: https://github.com/apache/kafka/pull/9318#issuecomment-705979334 @hachikuji @dajac Could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10590) Whole kafka node hungs when one node goes down.
sandeep p created KAFKA-10590: - Summary: Whole kafka node hungs when one node goes down. Key: KAFKA-10590 URL: https://issues.apache.org/jira/browse/KAFKA-10590 Project: Kafka Issue Type: Bug Components: KafkaConnect, zkclient Affects Versions: 2.5.0 Environment: Ubuntu 16.04 Reporter: sandeep p Whole cluster hungs when one of the three node goes down. To bring the cluster back all three nodes needs to be restarted. [2020-10-08 19:40:13,607] WARN Client session timed out, have not heard from server in 12002ms for sessionid 0x2acefe0 (org.apache.zookeeper.ClientCnxn) [2020-10-08 19:40:13,608] INFO Client session timed out, have not heard from server in 12002ms for sessionid 0x2acefe0, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) [2020-10-08 19:40:13,866] INFO Opening socket connection to server 10.0.14.7/10.0.14.7:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2020-10-08 19:40:13,867] INFO Socket error occurred: 10.0.14.7/10.0.14.7:2181: Connection refused (org.apache.zookeeper.ClientCnxn) [2020-10-08 19:40:13,968] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-10-08 19:40:13,968] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient) [2020-10-08 19:40:14,093] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (/10.0.2.5:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2020-10-08 19:40:14,093] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=205463854, epoch=INITIAL) to node 1: {}. (org.apache.kafka.clients.FetchSessionHandler) java.io.IOException: Connection to 10.0.2.5:9092 (id: 1 rack: null) failed. at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71) at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103) at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:300) at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:135) at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:134) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9726) LegacyReplicationPolicy for MM2 to mimic MM1
[ https://issues.apache.org/jira/browse/KAFKA-9726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210367#comment-17210367 ] Ivan Yurchenko edited comment on KAFKA-9726 at 10/9/20, 4:35 AM: - [~ryannedolan] I've created a PR https://github.com/apache/kafka/pull/9395. Please have a look when you have a chance. was (Author: ivanyu): https://github.com/apache/kafka/pull/9395 > LegacyReplicationPolicy for MM2 to mimic MM1 > > > Key: KAFKA-9726 > URL: https://issues.apache.org/jira/browse/KAFKA-9726 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Ryanne Dolan >Assignee: Ivan Yurchenko >Priority: Minor > > Per KIP-382, we should support MM2 in "legacy mode", i.e. with behavior > similar to MM1. A key requirement for this is a ReplicationPolicy that does > not rename topics. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10555) Improve client state machine
[ https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210606#comment-17210606 ] Sophie Blee-Goldman commented on KAFKA-10555: - > This, it won't be possible to add new thread when in NOT_RUNNING state >following the current proposal of the KIP. Ah, right. Alright I just went back and re-read the final version of KIP-671 and KIP-663 , and here's my summary of the overall current proposal across the relevant scenarios we need to consider: # User initiates client shutdown via KafkaStreams.close() --> current behavior is to transition to NOT_RUNNING (even if an error occurs during shutdown) # User initiates application shutdown via SHUTDOWN_KAFKA_STREAMS_APPLICATION enum --> current proposal is transition to ERROR # User removes the last thread via KafkaStreams.removeStreamThread() --> current proposal is transition to ERROR # User initiates client shutdown via SHUTDOWN_KAFKA_STREAMS_CLIENT enum --> current proposal is to transition to ERROR # Last thread is allowed to die via SHUTDOWN_STREAM_THREAD enum --> current proposal is transition to ERROR # New thread is started via KafkaStreams#addStreamThread --> current proposal is this is only possible in REBALANCING or RUNNING Just to make sure we're all on the same page, note that the current semantics of the ERROR state is technically _not_ a terminal state. We enter ERROR when the last thread dies, at which point _it's the responsibility of the user to shutdown the app_. So, the only transition out of ERROR is not NOT_RUNNING (which is, currently, terminal), but if a user doesn't manually invoke this it 'll just hang out in ERROR forever: which means the cleanup thread, metrics thread, etc continue on. Personally, I did not realize that until recently, and think it's one of the top things to take this opportunity to reconsider. For one thing, if the only thing a user can possibly do once in the ERROR state is call close() and transition out of it, then why not just do it for them? I'd rather not leave the door open to partially-closed zombie Streams applications. For the rest of this doc I'm assuming that we reimagine the ERROR state to run parallel to the NOT_RUNNING state rather than being upstream of it, and an application entering ERROR is always. and automatically shut down. Taking a step back, in the current, pre-KIP-663/671 world, it seems like Streams application operators face a tough choice: what to do if the instance goes into ERROR? If availability is most important you probably just automatically restart it, eg via `new KafkaStreams...start()` or by killing the app to restart the pod, etc. But if the error was truly fatal, or potentially corrupting, restarting it would be anywhere from useless to disastrous (consider repeated overcounting). So if you're a #safetyfirst kind of operator, you'd want to shut everything down and inspect/resolve the error before restarting things. Unfortunately right now there's no way for an operator to know when it's safe to automatically restart, and when manual intervention is merited. So they have to make a choice whether to automatically restart or not, and whichever approach they choose is guaranteed to be wrong some of the time. It seems like the ERROR vs NOT_RUNNING states are actually a natural solution to this problem: we say that NOT_RUNNING is recoverable and safe to restart however you like, including adding new threads. We then reserve the ERROR state for truly fatal errors that should be investigated and resolved before continuing. So ERROR basically means "restart at your own risk". In a post-KIP-663/671 world, users can listen in on the State to determine whether they should automatically restart. This brings up the obvious question: which errors are which? I know I've said a lot without even touching on the actual question at hand, when to transition to what. Some of these cases feel pretty straightforward to me, some not so much. Case #1: The current behavior of KafkaStreams.close() transitioning to NOT_RUNNING feels natural to me Case #2: The only reasonable way to interpret the SHUTDOWN_APPLICATION directive is to transition all clients to ERROR. I can't think of any reason you would want to shut down every instance in the application and then just restart it. If the triggering exception is bad enough to necessitate the nuclear option, well, that's probably not something you can just shrug off and continue. Case #3: Removing the last stream thread should result in NOT_RUNNING, or possibly it could just remain in RUNNING. I am a bit concerned that this might go unnoticed, though. And then we'll get (another) an escalation on Saturday night about consumer lag growing indefinitely. Case #4: This seems like the trickiest scenario to me. I honestly don't think we can make this call for the us
[GitHub] [kafka] guozhangwang commented on pull request #9352: KAFKA-10533; KafkaRaftClient should flush log after appends
guozhangwang commented on pull request #9352: URL: https://github.com/apache/kafka/pull/9352#issuecomment-705954882 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10573) Rename connect transform configs for KIP-629
[ https://issues.apache.org/jira/browse/KAFKA-10573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté reassigned KAFKA-10573: - Assignee: Xavier Léauté > Rename connect transform configs for KIP-629 > > > Key: KAFKA-10573 > URL: https://issues.apache.org/jira/browse/KAFKA-10573 > Project: Kafka > Issue Type: Sub-task >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xvrl commented on a change in pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629
xvrl commented on a change in pull request #9367: URL: https://github.com/apache/kafka/pull/9367#discussion_r502167123 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConfigUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class); + +/** + * Translates deprecated configurations into their non-deprecated equivalents + * + * @param configs the input configuration + * @param aliasGroups An array of arrays of synonyms. Each synonym array begins with the non-deprecated synonym + *For example, new String[][] { { a, b }, { c, d, e} } + *would declare b as a deprecated synonym for a, + *and d and e as deprecated synonyms for c. + * @return a new configuration map with deprecated keys translated to their non-deprecated equivalents + */ +public static Map translateDeprecatedConfigs(Map configs, String[][] aliasGroups) { +Set aliasSet = Stream.of(aliasGroups).flatMap(Stream::of).collect(Collectors.toSet()); + +// pass through all configurations without aliases +Map newConfigs = configs.entrySet().stream() +.filter(e -> !aliasSet.contains(e.getKey())) +// filter out null values +.filter(e -> Objects.nonNull(e.getValue())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Stream.of(aliasGroups).forEachOrdered(aliasGroup -> { +String target = aliasGroup[0]; + +List deprecated = Stream.of(aliasGroup) +.skip(1) // skip target +.filter(configs::containsKey) +.collect(Collectors.toList()); + +if (deprecated.isEmpty()) { +// No deprecated key(s) found. +if (configs.containsKey(target)) { +newConfigs.put(target, configs.get(target)); +} +return; +} + +String aliasString = String.join(", ", deprecated); + +if (configs.containsKey(target)) { +// Ignore the deprecated key(s) because the actual key was set. +log.error(target + " was configured, as well as the deprecated alias(es) " + + aliasString + ". Using the value of " + target); +newConfigs.put(target, configs.get(target)); +} else if (deprecated.size() > 1) { +log.error("The configuration keys " + aliasString + " are deprecated and may be " + Review comment: you have to thank @cmccabe for that ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10571) Replace occurrences of blackout with backoff for KIP-629
[ https://issues.apache.org/jira/browse/KAFKA-10571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xavier Léauté resolved KAFKA-10571. --- Resolution: Fixed > Replace occurrences of blackout with backoff for KIP-629 > > > Key: KAFKA-10571 > URL: https://issues.apache.org/jira/browse/KAFKA-10571 > Project: Kafka > Issue Type: Sub-task >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] huxihx opened a new pull request #9399: KAFKA-10584:IndexSearchType should use sealed trait instead of Enumeration
huxihx opened a new pull request #9399: URL: https://github.com/apache/kafka/pull/9399 https://issues.apache.org/jira/browse/KAFKA-10584 In Scala, we prefer sealed traits over Enumeration since the former gives you exhaustiveness checking. With Scala Enumeration, you don't get a warning if you add a new value that is not handled in a given pattern match. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10584) IndexSearchType should use sealed trait instead of Enumeration
[ https://issues.apache.org/jira/browse/KAFKA-10584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-10584: -- Assignee: huxihx > IndexSearchType should use sealed trait instead of Enumeration > -- > > Key: KAFKA-10584 > URL: https://issues.apache.org/jira/browse/KAFKA-10584 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Jun Rao >Assignee: huxihx >Priority: Major > Labels: newbie > > In Scala, we prefer sealed traits over Enumeration since the former gives you > exhaustiveness checking. With Scala Enumeration, you don't get a warning if > you add a new value that is not handled in a given pattern match. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502092508 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == "connection-accept-rate" && + metricName.group == MetricsGroup && + metricName.tags.containsKey("ip") +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} + +ip match { + case Some(addr) => +val address = InetAddress.getByName(addr) +if (maxConnectionRate.isDefined) { + info(s"Updating max connection rate override for $address to ${maxConnectionRate.get}") + connectionRatePerIp.put(address, maxConnectionRate.get) +} else { + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +val newQuota = maxConnectionRate.getOrElse(Int.MaxValue) Review comment: actually in this case, it should reset to `DynamicConfig.Ip.DefaultConnectionCreationRate` right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502092158 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: I see. Yes, I think unrecording is more efficient than keeping more delayed connections than needed. Basically, when you unrecord from per-IP metric, you can also unrecord from broker and listener metric as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
splett2 commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502091435 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: @apovzner my reasoning for this is the following: consider the case where we accept a connection at the broker/listener level, but reject it on IP level. we would have already recorded the broker connection, so we'd be allocating rate to a rejected connection. I suppose this we can work around this in a similar manner to `recordIpConnectionMaybeThrottle` by unrecording the listener/broker connection if the IP gets rejected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502091323 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -409,6 +409,67 @@ class ConnectionQuotasTest { verifyConnectionCountOnEveryListener(connectionQuotas, connectionsPerListener) } + @Test Review comment: It would be useful to add a test where we have both per-listener and per IP limit, and verify that it throttles based on which limit is reached first. Something like: 2 IPs, each per IP limit < per-listener limit, but sum of per IP limits > listener limit. So, if you reach limit on one IP, the broker would not throttle the second IP until it reaches per listener limit. Does not have to be exactly this, just need to verify how per IP throttling interacts with per listener throttling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap closed pull request #9398: MINOR update comments and docs to be gender-neutral
gwenshap closed pull request #9398: URL: https://github.com/apache/kafka/pull/9398 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502086777 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRate(ip: Option[String], maxConnectionRate: Option[Int]): Unit = { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == "connection-accept-rate" && + metricName.group == MetricsGroup && + metricName.tags.containsKey("ip") +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} + +ip match { + case Some(addr) => +val address = InetAddress.getByName(addr) +if (maxConnectionRate.isDefined) { + info(s"Updating max connection rate override for $address to ${maxConnectionRate.get}") + connectionRatePerIp.put(address, maxConnectionRate.get) +} else { + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +val newQuota = maxConnectionRate.getOrElse(Int.MaxValue) Review comment: You can use you new constant `DynamicConfig.Ip.UnlimitedConnectionCreationRate` instead of `Int.MaxValue` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits
apovzner commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private val listenerCounts = mutable.Map[ListenerName, Int]() private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, ListenerConnectionQuota]() @volatile private var totalCount = 0 - + @volatile private var defaultConnectionRatePerIp = DynamicConfig.Ip.DefaultConnectionCreationRate + private val inactiveSensorExpirationTimeSeconds = TimeUnit.HOURS.toSeconds(1); + private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]() + private val lock = new ReentrantReadWriteLock() + private val sensorAccessor = new SensorAccess(lock, metrics) // sensor that tracks broker-wide connection creation rate and limit (quota) - private val brokerConnectionRateSensor = createConnectionRateQuotaSensor(config.maxConnectionCreationRate) + private val brokerConnectionRateSensor = getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, BrokerQuotaEntity) private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong) + def inc(listenerName: ListenerName, address: InetAddress, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter) + val startThrottleTimeMs = time.milliseconds + + val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, startThrottleTimeMs) Review comment: It would be more efficient if we throttled IPs **after** we know that we can accept a connection based on broker-wide and per-listener limits, since reaching broker/listener limits block the acceptor thread while throttling IPs needs more processing. Otherwise, if you reach both broker and per IP limit, the broker will continue accepting and delaying connections where it is justified to block an acceptor thread based on reaching a broker rate limit. Basically, call `waitForConnectionSlot` first. Similar how we check per IP limit on number of connections after we know that we can accept a new connection based on broker/listener limits. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap commented on a change in pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629
gwenshap commented on a change in pull request #9367: URL: https://github.com/apache/kafka/pull/9367#discussion_r502083707 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConfigUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class); + +/** + * Translates deprecated configurations into their non-deprecated equivalents + * + * @param configs the input configuration + * @param aliasGroups An array of arrays of synonyms. Each synonym array begins with the non-deprecated synonym + *For example, new String[][] { { a, b }, { c, d, e} } + *would declare b as a deprecated synonym for a, + *and d and e as deprecated synonyms for c. + * @return a new configuration map with deprecated keys translated to their non-deprecated equivalents + */ +public static Map translateDeprecatedConfigs(Map configs, String[][] aliasGroups) { +Set aliasSet = Stream.of(aliasGroups).flatMap(Stream::of).collect(Collectors.toSet()); + +// pass through all configurations without aliases +Map newConfigs = configs.entrySet().stream() +.filter(e -> !aliasSet.contains(e.getKey())) +// filter out null values +.filter(e -> Objects.nonNull(e.getValue())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Stream.of(aliasGroups).forEachOrdered(aliasGroup -> { +String target = aliasGroup[0]; + +List deprecated = Stream.of(aliasGroup) +.skip(1) // skip target +.filter(configs::containsKey) +.collect(Collectors.toList()); + +if (deprecated.isEmpty()) { +// No deprecated key(s) found. +if (configs.containsKey(target)) { +newConfigs.put(target, configs.get(target)); +} +return; +} + +String aliasString = String.join(", ", deprecated); + +if (configs.containsKey(target)) { +// Ignore the deprecated key(s) because the actual key was set. +log.error(target + " was configured, as well as the deprecated alias(es) " + + aliasString + ". Using the value of " + target); +newConfigs.put(target, configs.get(target)); +} else if (deprecated.size() > 1) { +log.error("The configuration keys " + aliasString + " are deprecated and may be " + Review comment: I like the extra-detailed error messages, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap commented on a change in pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629
gwenshap commented on a change in pull request #9367: URL: https://github.com/apache/kafka/pull/9367#discussion_r502082826 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConfigUtils { + +private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class); + +/** + * Translates deprecated configurations into their non-deprecated equivalents + * + * @param configs the input configuration + * @param aliasGroups An array of arrays of synonyms. Each synonym array begins with the non-deprecated synonym + *For example, new String[][] { { a, b }, { c, d, e} } + *would declare b as a deprecated synonym for a, + *and d and e as deprecated synonyms for c. + * @return a new configuration map with deprecated keys translated to their non-deprecated equivalents + */ +public static Map translateDeprecatedConfigs(Map configs, String[][] aliasGroups) { +Set aliasSet = Stream.of(aliasGroups).flatMap(Stream::of).collect(Collectors.toSet()); + +// pass through all configurations without aliases +Map newConfigs = configs.entrySet().stream() +.filter(e -> !aliasSet.contains(e.getKey())) +// filter out null values +.filter(e -> Objects.nonNull(e.getValue())) +.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + +Stream.of(aliasGroups).forEachOrdered(aliasGroup -> { +String target = aliasGroup[0]; + +List deprecated = Stream.of(aliasGroup) Review comment: We may be able to save all this hassle by defining alias group as a map of `target` to a list of `deprecated` configs? We defined this as a 2-dim array but we always convert it to lists... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r502078584 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -782,7 +782,7 @@ class LogTest { } // Retain snapshots for the last 2 segments -ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) +ProducerStateManager.listSnapshotFiles(logDir).filter(_.offset < segmentOffsets(segmentOffsets.size - 2)).foreach(_.deleteIfExists()) Review comment: Yes, it should work if we switch these back to using deleteSnapshotsBefore. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils
guozhangwang commented on a change in pull request #9396: URL: https://github.com/apache/kafka/pull/9396#discussion_r502067124 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java ## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.ClientUtils; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +/** + * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor}, + * {@link Transformer}, and {@link ValueTransformer} implementations. + * + * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral + * tests that serve as example usage. + * + * Note that this class does not take any automated actions (such as firing scheduled punctuators). + * It simply captures any data it witnesses. + * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink + * {@link Topology} and using the {@link TopologyTestDriver}. + */ +public class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier { Review comment: Since we are adding a new class, could we have it extend InternalProcessorContext then, so that when we remove the old ones we can also cleanup the non-testing functions that branch on checking `instanceof InternalProcessorContext`: I'm assuming that `InternalProcessorContext` would stay in the end state, it would just extend `api.ProcessorContext` and not `StateStoreContext` in the future. ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java ## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language g
[GitHub] [kafka] xvrl opened a new pull request #9398: MINOR update comments and docs to be gender-neutral
xvrl opened a new pull request #9398: URL: https://github.com/apache/kafka/pull/9398 cc @gwenshap This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap closed pull request #9366: KAFKA-10571 Replace blackout with backoff for KIP-629
gwenshap closed pull request #9366: URL: https://github.com/apache/kafka/pull/9366 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl commented on pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629
xvrl commented on pull request #9367: URL: https://github.com/apache/kafka/pull/9367#issuecomment-705863610 jdk8 test failures appear unrelated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
ableegoldman commented on a change in pull request #9383: URL: https://github.com/apache/kafka/pull/9383#discussion_r502052730 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -483,7 +485,7 @@ public void testEagerSubscription() { Collections.sort(subscription.topics()); assertEquals(asList("topic1", "topic2"), subscription.topics()); -final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks); Review comment: Yeah, it should be `0` the first time you call it, then `1` the second time, and then back to `0` again on the third call This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210472#comment-17210472 ] Lauren McDonald commented on KAFKA-7447: We are on version 2.3.* and saw this issue multiple times. Up to you if you want to close...we put in some other manual fixes to get around it (like turning off auto leader rebalance, not ideal). > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from __consumer_offsets-29 > (kaf
[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
junrao commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r502023899 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -782,7 +782,7 @@ class LogTest { } // Retain snapshots for the last 2 segments -ProducerStateManager.deleteSnapshotsBefore(logDir, segmentOffsets(segmentOffsets.size - 2)) +ProducerStateManager.listSnapshotFiles(logDir).filter(_.offset < segmentOffsets(segmentOffsets.size - 2)).foreach(_.deleteIfExists()) Review comment: Since deleteSnapshotsBefore() still exist, could we keep using it? ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -794,16 +794,12 @@ class LogTest { // Only delete snapshots before the base offset of the recovery point segment (post KAFKA-5829 behaviour) to // avoid reading all segments -ProducerStateManager.deleteSnapshotsBefore(logDir, offsetForRecoveryPointSegment) +ProducerStateManager.listSnapshotFiles(logDir).filter(_.offset < offsetForRecoveryPointSegment).foreach(_.deleteIfExists()) Review comment: Since deleteSnapshotsBefore() still exist, could we keep using it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210470#comment-17210470 ] James Cheng commented on KAFKA-7447: Hi. The consensus seems to be that this was fixed in https://issues.apache.org/jira/browse/KAFKA-8896 Should we update the Status, Resolution, and Fixed fields to correspond to https://issues.apache.org/jira/browse/KAFKA-8896 ? > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from _
[GitHub] [kafka] cmccabe commented on pull request #9390: MINOR: Implement ApiError#equals and hashCode
cmccabe commented on pull request #9390: URL: https://github.com/apache/kafka/pull/9390#issuecomment-705828611 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9390: MINOR: Implement ApiError#equals and hashCode
cmccabe commented on pull request #9390: URL: https://github.com/apache/kafka/pull/9390#issuecomment-705826094 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9628) Replace Produce request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-9628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210441#comment-17210441 ] Mickael Maison commented on KAFKA-9628: --- [~chia7712] I've reassigned it to you, go for it > Replace Produce request/response with automated protocol > > > Key: KAFKA-9628 > URL: https://issues.apache.org/jira/browse/KAFKA-9628 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Chia-Ping Tsai >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9628) Replace Produce request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-9628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-9628: - Assignee: Chia-Ping Tsai (was: Mickael Maison) > Replace Produce request/response with automated protocol > > > Key: KAFKA-9628 > URL: https://issues.apache.org/jira/browse/KAFKA-9628 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Chia-Ping Tsai >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501979388 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -1226,6 +1225,104 @@ class LogTest { assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) } + @Test + def testRetentionDeletesProducerStateSnapshots(): Unit = { +val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0) +val log = createLog(logDir, logConfig) +val pid1 = 1L +val epoch = 0.toShort + +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) + +log.updateHighWatermark(log.logEndOffset) + +assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) +// Sleep to breach the retention period +mockTime.sleep(1000 * 60 + 1) +log.deleteOldSegments() +// Sleep to breach the file delete delay and run scheduled file deletion tasks +mockTime.sleep(1) +assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size) + } + + @Test + def testLogStartOffsetMovementDeletesSnapshots(): Unit = { +val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0) +val log = createLog(logDir, logConfig) +val pid1 = 1L +val epoch = 0.toShort + +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) +log.updateHighWatermark(log.logEndOffset) +assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + +// Increment the log start offset to exclude the first two segments. +log.maybeIncrementLogStartOffset(log.logEndOffset - 1, ClientRecordDeletion) +log.deleteOldSegments() +// Sleep to breach the file delete delay and run scheduled file deletion tasks +mockTime.sleep(1) +assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size) + } + + @Test + def testCompactionDeletesProducerStateSnapshots(): Unit = { +val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0) +val log = createLog(logDir, logConfig) +val pid1 = 1L +val epoch = 0.toShort +val cleaner = new Cleaner(id = 0, + offsetMap = new FakeOffsetMap(Int.MaxValue), + ioBufferSize = 64 * 1024, + maxIoBufferSize = 64 * 1024, + dupBufferLoadFactor = 0.75, + throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime), + time = mockTime, + checkDone = _ => {}) + +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "a".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "b".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "c".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) +log.updateHighWatermark(log.logEndOffset) +assertEquals("expected a snapshot file per segment base offset, except the first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) +assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + +// Clean segments, this should delete everything except the active segment since there only +// exists the key "a". +cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset)) +log.deleteOldSegments() +// Sleep to breach the file delete delay and run scheduled file deletion tasks +mockTime.sleep(1) +assertEquals("expected a snapshot file per segment base offset, excluding the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), P
[GitHub] [kafka] gardnervickers commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on pull request #7929: URL: https://github.com/apache/kafka/pull/7929#issuecomment-705786361 @junrao I think that `Log.takeProducerSnapshot` is being used for testing in a few places in `LogTest`, though my IDE does not pick that up for some reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501970716 ## File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ## @@ -834,6 +834,40 @@ class ProducerStateManagerTest { assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset) } + @Test + def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = { +// Test that when stray snapshots are removed, the largest stray snapshot is kept around. This covers the case where +// the broker shutdown cleanly and emitted a snapshot file larger than the base offset of the active segment. + +// Create 3 snapshot files at different offsets. +Log.producerSnapshotFile(logDir, 42).createNewFile() +Log.producerSnapshotFile(logDir, 5).createNewFile() +Log.producerSnapshotFile(logDir, 2).createNewFile() + +// claim that we only have one segment with a base offset of 5 +stateManager.removeStraySnapshots(Set(5)) + +// The snapshot file at offset 2 should be considered a stray, but the snapshot at 42 should be kept +// around because it is the largest snapshot. +assertEquals(Some(42), stateManager.latestSnapshotOffset) +assertEquals(Some(5), stateManager.oldestSnapshotOffset) +assertEquals(Seq(5, 42), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) + } + + @Test + def testRemoveAllStraySnapshots(): Unit = { +// Test that when stray snapshots are removed, all stray snapshots are removed when the base offset of the largest +// segment exceeds the offset of the largest stray snapshot. Review comment: Hmm, I think my comment here could be worded better. Offset `42` here is not a "stray", since we provide it along with the list of segmentBaseOffsets to `removeStraySnapshots`. I'll change up the wording on this, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501968249 ## File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ## @@ -834,6 +834,40 @@ class ProducerStateManagerTest { assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset) } + @Test + def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = { +// Test that when stray snapshots are removed, the largest stray snapshot is kept around. This covers the case where +// the broker shutdown cleanly and emitted a snapshot file larger than the base offset of the active segment. + +// Create 3 snapshot files at different offsets. +Log.producerSnapshotFile(logDir, 42).createNewFile() +Log.producerSnapshotFile(logDir, 5).createNewFile() +Log.producerSnapshotFile(logDir, 2).createNewFile() + +// claim that we only have one segment with a base offset of 5 +stateManager.removeStraySnapshots(Set(5)) + +// The snapshot file at offset 2 should be considered a stray, but the snapshot at 42 should be kept +// around because it is the largest snapshot. +assertEquals(Some(42), stateManager.latestSnapshotOffset) +assertEquals(Some(5), stateManager.oldestSnapshotOffset) +assertEquals(Seq(5, 42), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) + } + + @Test + def testRemoveAllStraySnapshots(): Unit = { +// Test that when stray snapshots are removed, all stray snapshots are removed when the base offset of the largest +// segment exceeds the offset of the largest stray snapshot. Review comment: I think this sentence is a bit confusing. Snapshot 42 is not meant to be a stray snapshot here, only 5 and 2 are. I will try to reword this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210410#comment-17210410 ] Matthias J. Sax commented on KAFKA-7334: [~vladimir_shadrin] I added you to the list of contributors and assigned the ticket to you. You can now also self-assign tickets. Please make sure you assign ticket to yourself when starting to work on them (to avoid that somebody else might work on it in parallel). Thank you! > Suggest changing config for state.dir in case of FileNotFoundException > -- > > Key: KAFKA-7334 > URL: https://issues.apache.org/jira/browse/KAFKA-7334 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Vladimir >Priority: Major > Labels: newbie > Fix For: 2.7.0 > > > Quoting stack trace from KAFKA-5998 : > {code} > WARN [2018-08-22 03:17:03,745] > org.apache.kafka.streams.processor.internals.ProcessorStateManager: task > [0_45] Failed to write offset checkpoint file to > /tmp/kafka-streams/ > {{ /0_45/.checkpoint: {}}} > {{ ! java.nio.file.NoSuchFileException: > /tmp/kafka-streams//0_45/.checkpoint.tmp}} > {{ ! at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}} > {{ ! at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}} > {code} > When state.dir is left at default configuration, there is a chance that > certain files under the state directory are cleaned by OS since the default > dir starts with /tmp/kafka-streams. > [~mjsax] and I proposed to suggest user, through exception message, to change > the location for state.dir . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException
[ https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-7334: -- Assignee: Vladimir > Suggest changing config for state.dir in case of FileNotFoundException > -- > > Key: KAFKA-7334 > URL: https://issues.apache.org/jira/browse/KAFKA-7334 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Assignee: Vladimir >Priority: Major > Labels: newbie > Fix For: 2.7.0 > > > Quoting stack trace from KAFKA-5998 : > {code} > WARN [2018-08-22 03:17:03,745] > org.apache.kafka.streams.processor.internals.ProcessorStateManager: task > [0_45] Failed to write offset checkpoint file to > /tmp/kafka-streams/ > {{ /0_45/.checkpoint: {}}} > {{ ! java.nio.file.NoSuchFileException: > /tmp/kafka-streams//0_45/.checkpoint.tmp}} > {{ ! at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}} > {{ ! at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}} > {code} > When state.dir is left at default configuration, there is a chance that > certain files under the state directory are cleaned by OS since the default > dir starts with /tmp/kafka-streams. > [~mjsax] and I proposed to suggest user, through exception message, to change > the location for state.dir . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9380: KAFKA-7334: Suggest changing config for state.dir in case of FileNotF…
mjsax commented on pull request #9380: URL: https://github.com/apache/kafka/pull/9380#issuecomment-705772843 Thanks for the PR @voffcheg109! Merged to `trunk` and cherry-picked to `2.7` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #9380: KAFKA-7334: Suggest changing config for state.dir in case of FileNotF…
mjsax merged pull request #9380: URL: https://github.com/apache/kafka/pull/9380 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501957800 ## File path: core/src/test/scala/unit/kafka/log/LogTest.scala ## @@ -1226,6 +1225,104 @@ class LogTest { assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) } + @Test + def testRetentionDeletesProducerStateSnapshots(): Unit = { +val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0) +val log = createLog(logDir, logConfig) +val pid1 = 1L +val epoch = 0.toShort + +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) + +log.updateHighWatermark(log.logEndOffset) + +assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) +// Sleep to breach the retention period +mockTime.sleep(1000 * 60 + 1) +log.deleteOldSegments() +// Sleep to breach the file delete delay and run scheduled file deletion tasks +mockTime.sleep(1) +assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size) + } + + @Test + def testLogStartOffsetMovementDeletesSnapshots(): Unit = { +val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, retentionBytes = -1, fileDeleteDelayMs = 0) +val log = createLog(logDir, logConfig) +val pid1 = 1L +val epoch = 0.toShort + +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) +log.updateHighWatermark(log.logEndOffset) +assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + +// Increment the log start offset to exclude the first two segments. +log.maybeIncrementLogStartOffset(log.logEndOffset - 1, ClientRecordDeletion) +log.deleteOldSegments() +// Sleep to breach the file delete delay and run scheduled file deletion tasks +mockTime.sleep(1) +assertEquals("expect a single producer state snapshot remaining", 1, ProducerStateManager.listSnapshotFiles(logDir).size) + } + + @Test + def testCompactionDeletesProducerStateSnapshots(): Unit = { +val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0) +val log = createLog(logDir, logConfig) +val pid1 = 1L +val epoch = 0.toShort +val cleaner = new Cleaner(id = 0, + offsetMap = new FakeOffsetMap(Int.MaxValue), + ioBufferSize = 64 * 1024, + maxIoBufferSize = 64 * 1024, + dupBufferLoadFactor = 0.75, + throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime), + time = mockTime, + checkDone = _ => {}) + +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "a".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 0), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "b".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 1), leaderEpoch = 0) +log.roll() +log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, "c".getBytes())), producerId = pid1, + producerEpoch = epoch, sequence = 2), leaderEpoch = 0) +log.updateHighWatermark(log.logEndOffset) +assertEquals("expected a snapshot file per segment base offset, except the first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) +assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size) + +// Clean segments, this should delete everything except the active segment since there only +// exists the key "a". +cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset)) +log.deleteOldSegments() +// Sleep to breach the file delete delay and run scheduled file deletion tasks +mockTime.sleep(1) +assertEquals("expected a snapshot file per segment base offset, excluding the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), P
[GitHub] [kafka] lct45 commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 commented on a change in pull request #9383: URL: https://github.com/apache/kafka/pull/9383#discussion_r501956905 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -483,7 +485,7 @@ public void testEagerSubscription() { Collections.sort(subscription.topics()); assertEquals(asList("topic1", "topic2"), subscription.topics()); -final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks); Review comment: > goes back and forth between the two expected values What do you mean by expected values? Toggling between the default of `0` and `1` after `partitionAssignor.subscriptionUserData` is called the first time? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -483,7 +485,7 @@ public void testEagerSubscription() { Collections.sort(subscription.topics()); assertEquals(asList("topic1", "topic2"), subscription.topics()); -final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks); Review comment: > goes back and forth between the two expected values What do you mean by expected values? Toggling between the default of `0` and `1` after `partitionAssignor.subscriptionUserData` is called the first time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] efeg commented on pull request #9397: KAFKA-10583: Add documentation on the thread-safety of KafkaAdminClient.
efeg commented on pull request #9397: URL: https://github.com/apache/kafka/pull/9397#issuecomment-705765230 @cmccabe Do you think you might be able to take a look at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] efeg opened a new pull request #9397: KAFKA-10583: Add documentation on the thread-safety of KafkaAdminClient.
efeg opened a new pull request #9397: URL: https://github.com/apache/kafka/pull/9397 Other than a Stack Overflow comment (see https://stackoverflow.com/a/61738065) by Colin Patrick McCabe (@cmccabe ) and a proposed design note on KIP-117 wiki, there is no source that verifies the thread-safety of `KafkaAdminClient`. This patch updates JavaDoc of `KafkaAdminClient` to clarify its thread-safety. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501944185 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition, // completed transactions whose markers are at offsets above the high watermark private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + /** + * Load producer state snapshots by scanning the _logDir. + */ + private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { +val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() +for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) { + tm.put(f.offset, f) +} +tm + } + + /** + * Scans the log directory, gathering all producer state snapshot files. Snapshot files which do not have an offset + * corresponding to one of the provided offsets in segmentBaseOffsets will be removed, except in the case that there + * is a snapshot file at a higher offset than any offset in segmentBaseOffsets. + * + * The goal here is to remove any snapshot files which do not have an associated segment file, but not to remove + */ + private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = { +var latestStraySnapshot: Option[SnapshotFile] = None +val ss = loadSnapshots() +for (snapshot <- ss.values().asScala) { + val key = snapshot.offset + latestStraySnapshot match { +case Some(prev) => + if (!segmentBaseOffsets.contains(key)) { +// this snapshot is now the largest stray snapshot. +prev.deleteIfExists() +ss.remove(prev.offset) +latestStraySnapshot = Some(snapshot) + } +case None => + if (!segmentBaseOffsets.contains(key)) { +latestStraySnapshot = Some(snapshot) Review comment: We perform a check below which may cover this case. After setting the `snapshots` map, we look at the latest snapshot in the map. If the latest snapshot in the map is not equal to the `latestStraySnapshot`, we delete the `latestStraySnapshot`. I think this is a bit confusing though, so it might be better if instead we directly check that the `latestStraySnapshot` is larger than the largest offset in `segmentBaseOffsets`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
gardnervickers commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501942512 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition: TopicPartition, def takeSnapshot(): Unit = { // If not a new offset, then it is not worth taking another snapshot if (lastMapOffset > lastSnapOffset) { - val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset) + val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, lastMapOffset)) info(s"Writing producer snapshot at offset $lastMapOffset") - writeSnapshot(snapshotFile, producers) + writeSnapshot(snapshotFile.file, producers) + snapshots.put(snapshotFile.offset, snapshotFile) // Update the last snap offset according to the serialized map lastSnapOffset = lastMapOffset } } + /** + * Update the parentDir for this ProducerStateManager and all of the snapshot files which it manages. + */ + def updateParentDir(parentDir: File): Unit ={ +_logDir = parentDir +snapshots.forEach((_, s) => s.updateParentDir(parentDir)) + } + /** * Get the last offset (exclusive) of the latest snapshot file. */ - def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => offsetFromFile(file)) + def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset) /** * Get the last offset (exclusive) of the oldest snapshot file. */ - def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) + def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset) /** - * When we remove the head of the log due to retention, we need to remove snapshots older than - * the new log start offset. + * Remove any unreplicated transactions lower than the provided logStartOffset and bring the lastMapOffset forward + * if necessary. */ - def truncateHead(logStartOffset: Long): Unit = { + def onLogStartOffsetIncremented(logStartOffset: Long): Unit = { removeUnreplicatedTransactions(logStartOffset) if (lastMapOffset < logStartOffset) lastMapOffset = logStartOffset -deleteSnapshotsBefore(logStartOffset) Review comment: The idea here is to clear un-replicated transactions and optionally advance the `lastMapOffset` and `lastSnapOffset` when the logStartOffset is advanced, but to leave the snapshot files around. The corresponding snapshot files should be removed during the retention pass as we cleanup the associated segment files. I was attempting to optimize incrementing the logStartOffset a bit so that we don't need to delete the snapshot files from the request handler thread when handling `DELETE_RECORDS`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210386#comment-17210386 ] Matthias J. Sax commented on KAFKA-10585: - [~dongjin] – please assign the ticket to yourself if you plan to work on this. > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Rohan Desai >Priority: Minor > Labels: newbie++ > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10585: Priority: Minor (was: Major) > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Rohan Desai >Priority: Minor > Labels: newbie++ > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10585: Issue Type: Improvement (was: Bug) > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Rohan Desai >Priority: Minor > Labels: newbie++ > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10585: -- Labels: newbie++ (was: ) > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Rohan Desai >Priority: Major > Labels: newbie++ > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils
vvcephei commented on a change in pull request #9396: URL: https://github.com/apache/kafka/pull/9396#discussion_r501891955 ## File path: checkstyle/suppressions.xml ## @@ -194,13 +194,13 @@ files=".*[/\\]streams[/\\].*test[/\\].*.java"/> + files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/> Review comment: This test violates both measures of complexity by virtue of the way it works: the test specifically includes a lot of loops so that it can generate every combination of store and store builder configuration to verify that everything works as expected. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java ## @@ -162,4 +164,30 @@ public Headers headers() { public Record withHeaders(final Headers headers) { return new Record<>(key, value, timestamp, headers); } + +@Override +public String toString() { Review comment: added for quality-of-life debugging ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java ## @@ -75,18 +75,25 @@ public String name() { @Deprecated @Override public void init(final ProcessorContext context, final StateStore root) { -this.context = (InternalProcessorContext) context; - -final StreamsMetricsImpl metrics = this.context.metrics(); final String threadId = Thread.currentThread().getName(); final String taskName = context.taskId().toString(); -expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor( -threadId, -taskName, -metricScope, -name, -metrics -); + +// The provided context is not required to implement InternalProcessorContext, +// If it doesn't, we can't record this metric. +if (context instanceof InternalProcessorContext) { Review comment: As a testament to `MockProcessorContextStateStoreTest`, it actually found this bug. I had overlooked this usage while making the other root stores context-implementation agnostic in the last PR. ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java ## @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.ClientUtils; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; + +import java.io.File; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; + +import static org.apache.kafka.com
[GitHub] [kafka] junrao commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
junrao commented on pull request #7929: URL: https://github.com/apache/kafka/pull/7929#issuecomment-705727027 Also, it seems that Log.takeProducerSnapshot() no longer used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10271) Performance regression while fetching a key from a single partition
[ https://issues.apache.org/jira/browse/KAFKA-10271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-10271. --- Fix Version/s: (was: 2.5.2) 2.7.0 Resolution: Fixed > Performance regression while fetching a key from a single partition > --- > > Key: KAFKA-10271 > URL: https://issues.apache.org/jira/browse/KAFKA-10271 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0, 2.6.0, 2.5.1 >Reporter: Dima R >Assignee: Dima R >Priority: Major > Labels: KAFKA-10030, KAFKA-9445, KIP-562 > Fix For: 2.7.0, 2.6.1 > > Attachments: 9020.png > > > This is follow-up bug for KAFKA-10030 > StreamThreadStateStoreProvider excessive loop over calling > internalTopologyBuilder.topicGroups(), which is synchronized, thus causing > significant performance degradation to the caller, especially when store has > many partitions. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-705723421 Cherry-picked to 2.6 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils
vvcephei opened a new pull request #9396: URL: https://github.com/apache/kafka/pull/9396 Implements KIP-478 for the test-utils module: * adds mocks of the new ProcessorContext and StateStoreContext * adds tests that all stores and store builders are usable with the new mock * adds tests that the new Processor api is usable with the new mock * updates the demonstration Processor to the new api ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-705707869 Test passed, merged to trunk. Thanks @dima5rr for your great contribution! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
guozhangwang merged pull request #9020: URL: https://github.com/apache/kafka/pull/9020 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao removed a comment on pull request #9393: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis
junrao removed a comment on pull request #9393: URL: https://github.com/apache/kafka/pull/9393#issuecomment-705702092 In this PR, I have addressed the review comments from @chia7712 in #9001 which were provided after #9001 was merged. The changes are made mainly to KafkaAdminClient: Improve error message in updateFeatures api when feature name is empty. Propagate top-level error message in updateFeatures api. Add an empty-parameter variety for describeFeatures api. Minor documentation updates to @param and @return to make these resemble other apis. Reviewers: Chia-Ping Tsai , Jun Rao This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
ableegoldman commented on a change in pull request #9383: URL: https://github.com/apache/kafka/pull/9383#discussion_r501876646 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java ## @@ -59,6 +60,7 @@ ); private final static String IGNORED_USER_ENDPOINT = "ignoredUserEndpoint:80"; +private static final byte[] IGNORED_UNIQUE_FIELD = Bytes.EMPTY; Review comment: nit: let's use `new byte[1]` for this to make sure it's actually being ignored when it should be (since apparently it won't notice if you just pass in empty bytes for this field on a version < 8) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #9393: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis
junrao merged pull request #9393: URL: https://github.com/apache/kafka/pull/9393 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9393: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis
junrao commented on pull request #9393: URL: https://github.com/apache/kafka/pull/9393#issuecomment-705702092 In this PR, I have addressed the review comments from @chia7712 in #9001 which were provided after #9001 was merged. The changes are made mainly to KafkaAdminClient: Improve error message in updateFeatures api when feature name is empty. Propagate top-level error message in updateFeatures api. Add an empty-parameter variety for describeFeatures api. Minor documentation updates to @param and @return to make these resemble other apis. Reviewers: Chia-Ping Tsai , Jun Rao This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
ableegoldman commented on a change in pull request #9383: URL: https://github.com/apache/kafka/pull/9383#discussion_r501875691 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java ## @@ -286,7 +286,8 @@ private static Properties streamsProperties(final String appId, mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "6"), mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"), - mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()) + mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName()), +mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40) Review comment: Can we add a comment here explaining why we set the thread count so high? I feel like we'll forget and be really confused when we stumble across this in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ivanyu opened a new pull request #9395: KAFKA-9726: Add LegacyReplicationPolicy for MM2
ivanyu opened a new pull request #9395: URL: https://github.com/apache/kafka/pull/9395 This commit adds a new replication policy for MirrorMaker 2, `LegacyReplicationPolicy`. This policy imitates MirrorMaker 1 behavior of not renaming replicated topics. The exception is made for `heartbeats` topic, that is replicated according to `DefaultReplicationPolicy`. Avoiding renaming topics brings a number of limitations, among which the most important one is the impossibility of detecting replication cycles. This makes cross-replication using `LegacyReplicationPolicy` effectively impossible. See `LegacyReplicationPolicy` Javadoc for details. A new method `canTrackSource` is added to `ReplicationPolicy`. Its result indicates if the replication policy can track back to the source topic of a topic. It is needed to allow detecting target topics work when `LegacyReplicationPolicy` is used. On the testing side, the tests have the same strategy as for `DefaultReplicationPolicy` with nicessary adjustments (e.g. no active/active replication is tested). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r501873791 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = throwable -> handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { +if (streamsUncaughtExceptionHandler != null) { Review comment: I agree with Bruno, I don't see why anyone would want to reset the exception handler (why set it in the first place then?). But if for some reason they really did set it at some point in their code and then later on want to revert to the default behavior, they can just pass in a handler that returns `SHUTDOWN_STREAMTHREAD_THREAD` themselves. If that's really what they want, then they should specify it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
ableegoldman commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r501868835 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = throwable -> handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { Review comment: I think users may find at least some of the same functionality to be useful, in particular the "shutdown client" and "shutdown application" enums. I also feel like users may want to be able to restart the global thread. I think we can consider that out of scope for now, but I'd prefer to avoid introducing a new method that just accepts the old kind of uncaught exception handler if we're just going to deprecate that too. But I do agree that it's pretty different, and we also don't want to commit to adding this functionality for the global thread right away so we should make sure it's clear what is and isn't implemented. So what do you think about mirroring the new handler for the StreamThread, eg adding a "GlobalUncaughtExceptionHandler", and just only including the `SHUTDOWN_THREAD` enum in the initial version? That way we're set up to easily extend the global handling feature without necessarily needing to implement it right away This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories
junrao commented on a change in pull request #7929: URL: https://github.com/apache/kafka/pull/7929#discussion_r501325705 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition, // completed transactions whose markers are at offsets above the high watermark private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + /** + * Load producer state snapshots by scanning the _logDir. + */ + private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { +val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() +for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) { Review comment: ProducerStateManager.listSnapshotFiles() could just be listSnapshotFiles() ? ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition, // completed transactions whose markers are at offsets above the high watermark private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + /** + * Load producer state snapshots by scanning the _logDir. + */ + private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { +val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() +for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) { + tm.put(f.offset, f) +} +tm + } + + /** + * Scans the log directory, gathering all producer state snapshot files. Snapshot files which do not have an offset + * corresponding to one of the provided offsets in segmentBaseOffsets will be removed, except in the case that there + * is a snapshot file at a higher offset than any offset in segmentBaseOffsets. + * + * The goal here is to remove any snapshot files which do not have an associated segment file, but not to remove Review comment: Incomplete sentence after "but not to remove". ## File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala ## @@ -834,6 +834,40 @@ class ProducerStateManagerTest { assertEquals(None, stateManager.lastEntry(producerId).get.currentTxnFirstOffset) } + @Test + def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = { +// Test that when stray snapshots are removed, the largest stray snapshot is kept around. This covers the case where +// the broker shutdown cleanly and emitted a snapshot file larger than the base offset of the active segment. + +// Create 3 snapshot files at different offsets. +Log.producerSnapshotFile(logDir, 42).createNewFile() +Log.producerSnapshotFile(logDir, 5).createNewFile() +Log.producerSnapshotFile(logDir, 2).createNewFile() + +// claim that we only have one segment with a base offset of 5 +stateManager.removeStraySnapshots(Set(5)) + +// The snapshot file at offset 2 should be considered a stray, but the snapshot at 42 should be kept +// around because it is the largest snapshot. +assertEquals(Some(42), stateManager.latestSnapshotOffset) +assertEquals(Some(5), stateManager.oldestSnapshotOffset) +assertEquals(Seq(5, 42), ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted) + } + + @Test + def testRemoveAllStraySnapshots(): Unit = { +// Test that when stray snapshots are removed, all stray snapshots are removed when the base offset of the largest +// segment exceeds the offset of the largest stray snapshot. Review comment: Below, the base offset of the largest segment equals to and doesn't exceed the offset of the largest stray snapshot. ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: TopicPartition, // completed transactions whose markers are at offsets above the high watermark private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata] + /** + * Load producer state snapshots by scanning the _logDir. + */ + private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = { +val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]() +for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) { + tm.put(f.offset, f) +} +tm + } + + /** + * Scans the log directory, gathering all producer state snapshot files. Snapshot files which do not have an offset + * corresponding to one of the provided offsets in segmentBaseOffsets will be removed, except in the case that there + * is a snapshot file at a higher offset than any offset in segmentBaseOffsets. + * + * The goal here is to remove any snapshot files which do not have an associated segment file, but not to remove + */ + private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = { +var lates
[GitHub] [kafka] ableegoldman commented on pull request #9380: KAFKA-7334: Suggest changing config for state.dir in case of FileNotF…
ableegoldman commented on pull request #9380: URL: https://github.com/apache/kafka/pull/9380#issuecomment-705689770 Tests passed, should be good to merge @mjsax . Btw the 2.7 branch was just cut so this should be cherrypicked back to 2.7. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 commented on pull request #9383: URL: https://github.com/apache/kafka/pull/9383#issuecomment-705688523 System tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4204/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10589) Rename kafka-replica-verification CLI command line arguments for KIP-629
Xavier Léauté created KAFKA-10589: - Summary: Rename kafka-replica-verification CLI command line arguments for KIP-629 Key: KAFKA-10589 URL: https://issues.apache.org/jira/browse/KAFKA-10589 Project: Kafka Issue Type: Sub-task Reporter: Xavier Léauté -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10588) Rename kafka-console-consumer CLI command line arguments for KIP-629
Xavier Léauté created KAFKA-10588: - Summary: Rename kafka-console-consumer CLI command line arguments for KIP-629 Key: KAFKA-10588 URL: https://issues.apache.org/jira/browse/KAFKA-10588 Project: Kafka Issue Type: Sub-task Reporter: Xavier Léauté -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10587) Rename kafka-mirror-maker CLI command line arguments for KIP-629
Xavier Léauté created KAFKA-10587: - Summary: Rename kafka-mirror-maker CLI command line arguments for KIP-629 Key: KAFKA-10587 URL: https://issues.apache.org/jira/browse/KAFKA-10587 Project: Kafka Issue Type: Sub-task Reporter: Xavier Léauté -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9679) Mock consumer should behave consistent with actual consumer
[ https://issues.apache.org/jira/browse/KAFKA-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sheikh Araf updated KAFKA-9679: --- Attachment: (was: image.png) > Mock consumer should behave consistent with actual consumer > --- > > Key: KAFKA-9679 > URL: https://issues.apache.org/jira/browse/KAFKA-9679 > Project: Kafka > Issue Type: Test > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Sujay Hegde >Priority: Major > Labels: help-wanted, newbie, newbie++ > > Right now in MockConsumer we shall return illegal state exception when the > buffered records are not able to find corresponding assigned partitions. This > is not the case for KafkaConsumer where we shall just not return those data > during `poll()` call. This inconsistent behavior should be fixed. > Note that if we are going to take this fix, the full unit tests need to be > executed to make sure no regression is introduced, as some tests are > potentially depending on the current MockConsumer behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9679) Mock consumer should behave consistent with actual consumer
[ https://issues.apache.org/jira/browse/KAFKA-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sheikh Araf updated KAFKA-9679: --- Attachment: image.png > Mock consumer should behave consistent with actual consumer > --- > > Key: KAFKA-9679 > URL: https://issues.apache.org/jira/browse/KAFKA-9679 > Project: Kafka > Issue Type: Test > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Sujay Hegde >Priority: Major > Labels: help-wanted, newbie, newbie++ > > Right now in MockConsumer we shall return illegal state exception when the > buffered records are not able to find corresponding assigned partitions. This > is not the case for KafkaConsumer where we shall just not return those data > during `poll()` call. This inconsistent behavior should be fixed. > Note that if we are going to take this fix, the full unit tests need to be > executed to make sure no regression is introduced, as some tests are > potentially depending on the current MockConsumer behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] xvrl commented on pull request #9366: KAFKA-10571 Replace blackout with backoff for KIP-629
xvrl commented on pull request #9366: URL: https://github.com/apache/kafka/pull/9366#issuecomment-705676711 CI seems broken, can we trigger tests again for this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl commented on pull request #9366: KAFKA-10571 Replace blackout with backoff for KIP-629
xvrl commented on pull request #9366: URL: https://github.com/apache/kafka/pull/9366#issuecomment-705676790 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
dima5rr removed a comment on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210308#comment-17210308 ] Rohan Desai commented on KAFKA-10585: - Yes, this is a good summary. > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Rohan Desai >Priority: Major > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition
dima5rr commented on pull request #9020: URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
cadonna commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r501802252 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -617,7 +619,19 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); try { -streams.setUncaughtExceptionHandler(null); + streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null); +fail("Should throw IllegalStateException"); +} catch (final IllegalStateException e) { +// expected +} +} Review comment: Sorry, my bad! It doesn't matter whether it is called or not, since the `IllegalStateException` is thrown. ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -617,7 +619,19 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); try { -streams.setUncaughtExceptionHandler(null); + streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null); +fail("Should throw IllegalStateException"); +} catch (final IllegalStateException e) { +// expected +} +} + +@Test +public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() { Review comment: Actually, there are two tests missing. With the test you added, you test the `else` branch on line 400 in `KafkaStreams`. But you do not test the `then` branch of the same `if` statement and within the `then` branch there are again a `then` branch and a `else` branch to test. If you do without the inner `else` branch as I proposed in another comment, you need to test the exception that is thrown when `null` is passed to the method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler
cadonna commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r501793433 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = throwable -> handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +for (final StreamThread thread : threads) { +if (streamsUncaughtExceptionHandler != null) { Review comment: I do not think we need to emulate the behavior of the java uncaught exception handler here. I do not see why a user should try to reset the uncaught exception handler. If we receive this requirement, we can still add it. I have the impression the "reset" makes this code unnecessarily complex. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #9271: MINOR: correct package of LinuxIoMetricsCollector
mimaison merged pull request #9271: URL: https://github.com/apache/kafka/pull/9271 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector
mimaison commented on pull request #9271: URL: https://github.com/apache/kafka/pull/9271#issuecomment-705628291 Test failures are not related and passed locally, merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acmck closed pull request #9394: Fully automate dev setup with Gitpod
acmck closed pull request #9394: URL: https://github.com/apache/kafka/pull/9394 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] acmck opened a new pull request #9394: Fully automate dev setup with Gitpod
acmck opened a new pull request #9394: URL: https://github.com/apache/kafka/pull/9394 This commit implements a fully-automated development setup using Gitpod.io, an online IDE for GitLab, GitHub, and Bitbucket that enables Dev-Environments-As-Code. This makes it easy for anyone to get a ready-to-code workspace for any branch, issue or pull request almost instantly with a single click. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup
[ https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210251#comment-17210251 ] Dongjin Lee commented on KAFKA-10585: - I inspected this issue a little bit. You mean, all `\{state.dir}/\{application-id}/*` are deleted but `\{state.dir}/\{application-id}` itself does not deleted when cleanup, occupying filesystem resources. Do I understand correctly? > Kafka Streams should clean up the state store directory from cleanup > > > Key: KAFKA-10585 > URL: https://issues.apache.org/jira/browse/KAFKA-10585 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Rohan Desai >Priority: Major > > Currently, `KafkaStreams.cleanup` cleans up all the task-level directories > and the global directory. However it doesn't clean up the enclosing state > store directory, though streams does create this directory when it > initializes the state for the streams app. Feels like it should remove this > directory when it cleans up. > We notice this in ksql quite often, since every new query is a new streams > app. Over time, we see lots of state store directories left around for old > queries. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10586) MirrorMaker 2.0 REST support
Daniel Urban created KAFKA-10586: Summary: MirrorMaker 2.0 REST support Key: KAFKA-10586 URL: https://issues.apache.org/jira/browse/KAFKA-10586 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Assignee: Daniel Urban KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that with specific workloads, the dedicated MM2 cluster can become unable to react to dynamic topic and group filter changes. (This occurs when after a rebalance operation, the leader node has no MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is stopped on the leader, meaning it cannot detect config changes by itself. Followers still running the connector can detect config changes, but they cannot query the leader for config updates.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison merged pull request #9296: MINOR: remove unused scala files from core module
mimaison merged pull request #9296: URL: https://github.com/apache/kafka/pull/9296 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org