[GitHub] [kafka] xvrl opened a new pull request #9405: MINOR internal KIP-629 changes to methods and variables

2020-10-08 Thread GitBox


xvrl opened a new pull request #9405:
URL: https://github.com/apache/kafka/pull/9405


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl opened a new pull request #9404: KAFKA-10589 replica verification tool changes for KIP-629

2020-10-08 Thread GitBox


xvrl opened a new pull request #9404:
URL: https://github.com/apache/kafka/pull/9404


   depends on #9400, ignore first commit
   still needs backwards compatibility changes



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10589) Rename kafka-replica-verification CLI command line arguments for KIP-629

2020-10-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté reassigned KAFKA-10589:
-

Assignee: Xavier Léauté

> Rename kafka-replica-verification CLI command line arguments for KIP-629
> 
>
> Key: KAFKA-10589
> URL: https://issues.apache.org/jira/browse/KAFKA-10589
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xvrl opened a new pull request #9403: KAFKA-10573 Update connect transforms configs for KIP-629

2020-10-08 Thread GitBox


xvrl opened a new pull request #9403:
URL: https://github.com/apache/kafka/pull/9403


   depends on #9367 (KAFKA-10570) - ignore the first commit until the other PR 
is merged
   
   cc @rhauch for review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10588) Rename kafka-console-consumer CLI command line arguments for KIP-629

2020-10-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté reassigned KAFKA-10588:
-

Assignee: Xavier Léauté

> Rename kafka-console-consumer CLI command line arguments for KIP-629
> 
>
> Key: KAFKA-10588
> URL: https://issues.apache.org/jira/browse/KAFKA-10588
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xvrl opened a new pull request #9402: update console consumer arguments for KIP-629

2020-10-08 Thread GitBox


xvrl opened a new pull request #9402:
URL: https://github.com/apache/kafka/pull/9402


   draft PR, more changes needed in order to ensure backwards compatibility



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #9399: KAFKA-10584:IndexSearchType should use sealed trait instead of Enumeration

2020-10-08 Thread GitBox


huxihx commented on pull request #9399:
URL: https://github.com/apache/kafka/pull/9399#issuecomment-705990045


   @junrao Please review this minor change.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList

2020-10-08 Thread GitBox


chia7712 commented on pull request #9400:
URL: https://github.com/apache/kafka/pull/9400#issuecomment-705989392


   > this was already discussed as part of KIP-629. The term "include" was 
chosen to align with existing configs we already had. Please see the KIP and 
the mailing list discussion
   
   Got it. thanks for the information!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl commented on pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList

2020-10-08 Thread GitBox


xvrl commented on pull request #9400:
URL: https://github.com/apache/kafka/pull/9400#issuecomment-705987912


   @chia7712 this was already discussed as part of KIP-629. The term "include" 
was chosen to align with existing configs we already had. Please see the 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-629:+Use+racially+neutral+terms+in+our+codebase)
 and [the mailing list 
discussion](https://lists.apache.org/thread.html/rbe19a71644c85c53de7ea5cfa00e4c90f530332f09758f24709b81f6%40%3Cdev.kafka.apache.org%3E)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9401: KAFKA-9628 Replace Produce request with automated protocol

2020-10-08 Thread GitBox


chia7712 opened a new pull request #9401:
URL: https://github.com/apache/kafka/pull/9401


   issue: https://issues.apache.org/jira/browse/KAFKA-9628
   
   this PR is a part of KAFKA-9628.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList

2020-10-08 Thread GitBox


chia7712 commented on pull request #9400:
URL: https://github.com/apache/kafka/pull/9400#issuecomment-705984805


   Is ```allowList``` more suitable to replace ```whiteList```?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9628) Replace Produce request/response with automated protocol

2020-10-08 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210629#comment-17210629
 ] 

Chia-Ping Tsai commented on KAFKA-9628:
---

> I've reassigned it to you, go for it

thanks!

I will file two PR to address request/response individually.

> Replace Produce request/response with automated protocol
> 
>
> Key: KAFKA-9628
> URL: https://issues.apache.org/jira/browse/KAFKA-9628
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Chia-Ping Tsai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xvrl opened a new pull request #9400: MINOR rename kafka.utils.Whitelist to IncludeList

2020-10-08 Thread GitBox


xvrl opened a new pull request #9400:
URL: https://github.com/apache/kafka/pull/9400


   rename internal classes and methods for KIP-629



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9318: KAFKA-10497 Convert group coordinator metadata schemas to use generat…

2020-10-08 Thread GitBox


chia7712 commented on pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#issuecomment-705979334


   @hachikuji @dajac Could you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10590) Whole kafka node hungs when one node goes down.

2020-10-08 Thread sandeep p (Jira)
sandeep p created KAFKA-10590:
-

 Summary: Whole kafka node hungs when one node goes down.
 Key: KAFKA-10590
 URL: https://issues.apache.org/jira/browse/KAFKA-10590
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect, zkclient
Affects Versions: 2.5.0
 Environment: Ubuntu 16.04
Reporter: sandeep p


Whole cluster hungs when one of the three node goes down. To bring the cluster 
back all three nodes needs to be restarted.

 

[2020-10-08 19:40:13,607] WARN Client session timed out, have not heard from 
server in 12002ms for sessionid 0x2acefe0 (org.apache.zookeeper.ClientCnxn)
 [2020-10-08 19:40:13,608] INFO Client session timed out, have not heard from 
server in 12002ms for sessionid 0x2acefe0, closing socket connection and 
attempting reconnect (org.apache.zookeeper.ClientCnxn)
 [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
 [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)
 [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
 [2020-10-08 19:40:13,709] INFO [ZooKeeperClient Kafka server] Connected. 
(kafka.zookeeper.ZooKeeperClient)
 [2020-10-08 19:40:13,866] INFO Opening socket connection to server 
10.0.14.7/10.0.14.7:2181. Will not attempt to authenticate using SASL (unknown 
error) (org.apache.zookeeper.ClientCnxn)
 [2020-10-08 19:40:13,867] INFO Socket error occurred: 
10.0.14.7/10.0.14.7:2181: Connection refused (org.apache.zookeeper.ClientCnxn)
 [2020-10-08 19:40:13,968] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
 [2020-10-08 19:40:13,968] INFO [ZooKeeperClient Kafka server] Waiting until 
connected. (kafka.zookeeper.ZooKeeperClient)
 [2020-10-08 19:40:14,093] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Connection to node 1 (/10.0.2.5:9092) could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
 [2020-10-08 19:40:14,093] INFO [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error sending fetch request (sessionId=205463854, epoch=INITIAL) 
to node 1: {}. (org.apache.kafka.clients.FetchSessionHandler)
 java.io.IOException: Connection to 10.0.2.5:9092 (id: 1 rack: null) failed.
 at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:103)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:206)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:300)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:135)
 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:134)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9726) LegacyReplicationPolicy for MM2 to mimic MM1

2020-10-08 Thread Ivan Yurchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210367#comment-17210367
 ] 

Ivan Yurchenko edited comment on KAFKA-9726 at 10/9/20, 4:35 AM:
-

[~ryannedolan] I've created a PR https://github.com/apache/kafka/pull/9395. 
Please have a look when you have a chance.


was (Author: ivanyu):
https://github.com/apache/kafka/pull/9395

> LegacyReplicationPolicy for MM2 to mimic MM1
> 
>
> Key: KAFKA-9726
> URL: https://issues.apache.org/jira/browse/KAFKA-9726
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Ivan Yurchenko
>Priority: Minor
>
> Per KIP-382, we should support MM2 in "legacy mode", i.e. with behavior 
> similar to MM1. A key requirement for this is a ReplicationPolicy that does 
> not rename topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10555) Improve client state machine

2020-10-08 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210606#comment-17210606
 ] 

Sophie Blee-Goldman commented on KAFKA-10555:
-

> This, it won't be possible to add new thread when in NOT_RUNNING state 
>following the current proposal of the KIP.

Ah, right. Alright I just went back and re-read the final version of KIP-671 
and KIP-663 , and here's my summary of the overall current proposal across the 
relevant scenarios we need to consider:
 # User initiates client shutdown via KafkaStreams.close() --> current behavior 
is to transition to NOT_RUNNING (even if an error occurs during shutdown)
 # User initiates application shutdown via SHUTDOWN_KAFKA_STREAMS_APPLICATION 
enum -->  current proposal is transition to ERROR
 # User removes the last thread via KafkaStreams.removeStreamThread() --> 
current proposal is transition to ERROR
 # User initiates client shutdown via SHUTDOWN_KAFKA_STREAMS_CLIENT enum --> 
current proposal is to transition to ERROR
 # Last thread is allowed to die via SHUTDOWN_STREAM_THREAD enum --> current 
proposal is transition to ERROR
 # New thread is started via KafkaStreams#addStreamThread --> current proposal 
is this is only possible in REBALANCING or RUNNING

Just to make sure we're all on the same page, note that the current semantics 
of the ERROR state is technically _not_ a terminal state. We enter ERROR when 
the last thread dies, at which point _it's the responsibility of the user to 
shutdown the app_. So, the only transition out of ERROR is not NOT_RUNNING 
(which is, currently, terminal), but if a user doesn't manually invoke this it 
'll just hang out in ERROR forever: which means the cleanup thread, metrics 
thread, etc continue on.

Personally, I did not realize that until recently, and think it's one of the 
top things to take this opportunity to reconsider. For one thing, if the only 
thing a user can possibly do once in the ERROR state is call close() and 
transition out of it, then why not just do it for them? I'd rather not leave 
the door open to partially-closed zombie Streams applications. For the rest of 
this doc I'm assuming that we reimagine the ERROR state to run parallel to the 
NOT_RUNNING state rather than being upstream of it, and an application entering 
ERROR is always. and automatically shut down.

 

Taking a step back, in the current, pre-KIP-663/671 world, it seems like 
Streams application operators face a tough choice: what to do if the instance 
goes into ERROR? If availability is most important you probably just 
automatically restart it, eg via `new KafkaStreams...start()` or by killing the 
app to restart the pod, etc. But if the error was truly fatal, or potentially 
corrupting, restarting it would be anywhere from useless to disastrous 
(consider repeated overcounting). So if you're a #safetyfirst kind of operator, 
you'd want to shut everything down and inspect/resolve the error before 
restarting things.

Unfortunately right now there's no way for an operator to know when it's safe 
to automatically restart, and when manual intervention is merited. So they have 
to make a choice whether to automatically restart or not, and whichever 
approach they choose is guaranteed to be wrong some of the time.

It seems like the ERROR vs NOT_RUNNING states are actually a natural solution 
to this problem: we say that NOT_RUNNING is recoverable and safe to restart 
however you like, including adding new threads. We then reserve the ERROR state 
for truly fatal errors that should be investigated and resolved before 
continuing. So ERROR basically means "restart at your own risk". In a 
post-KIP-663/671 world, users can listen in on the State to determine whether 
they should automatically restart.

This brings up the obvious question: which errors are which? I know I've said a 
lot without even touching on the actual question at hand, when to transition to 
what. Some of these cases feel pretty straightforward to me, some not so much.

Case #1: The current behavior of KafkaStreams.close() transitioning to 
NOT_RUNNING feels natural to me

Case #2: The only reasonable way to interpret the SHUTDOWN_APPLICATION 
directive is to transition all clients to ERROR. I can't think of any reason 
you would want to shut down every instance in the application and then just 
restart it. If the triggering exception is bad enough to necessitate the 
nuclear option, well, that's probably not something you can just shrug off and 
continue.

Case #3: Removing the last stream thread should result in NOT_RUNNING, or 
possibly it could just remain in RUNNING. I am a bit concerned that this might 
go unnoticed, though. And then we'll get (another) an escalation on Saturday 
night about consumer lag growing indefinitely. 

Case #4: This seems like the trickiest scenario to me. I honestly don't think 
we can make this call for the us

[GitHub] [kafka] guozhangwang commented on pull request #9352: KAFKA-10533; KafkaRaftClient should flush log after appends

2020-10-08 Thread GitBox


guozhangwang commented on pull request #9352:
URL: https://github.com/apache/kafka/pull/9352#issuecomment-705954882


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10573) Rename connect transform configs for KIP-629

2020-10-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté reassigned KAFKA-10573:
-

Assignee: Xavier Léauté

> Rename connect transform configs for KIP-629
> 
>
> Key: KAFKA-10573
> URL: https://issues.apache.org/jira/browse/KAFKA-10573
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xvrl commented on a change in pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629

2020-10-08 Thread GitBox


xvrl commented on a change in pull request #9367:
URL: https://github.com/apache/kafka/pull/9367#discussion_r502167123



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConfigUtils {
+
+private static final Logger log = 
LoggerFactory.getLogger(ConfigUtils.class);
+
+/**
+ * Translates deprecated configurations into their non-deprecated 
equivalents
+ *
+ * @param configs the input configuration
+ * @param aliasGroups An array of arrays of synonyms.  Each synonym array 
begins with the non-deprecated synonym
+ *For example, new String[][] { { a, b }, { c, d, e} }
+ *would declare b as a deprecated synonym for a,
+ *and d and e as deprecated synonyms for c.
+ * @return a new configuration map with deprecated  keys translated to 
their non-deprecated equivalents
+ */
+public static  Map translateDeprecatedConfigs(Map 
configs, String[][] aliasGroups) {
+Set aliasSet = 
Stream.of(aliasGroups).flatMap(Stream::of).collect(Collectors.toSet());
+
+// pass through all configurations without aliases
+Map newConfigs = configs.entrySet().stream()
+.filter(e -> !aliasSet.contains(e.getKey()))
+// filter out null values
+.filter(e -> Objects.nonNull(e.getValue()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+Stream.of(aliasGroups).forEachOrdered(aliasGroup -> {
+String target = aliasGroup[0];
+
+List deprecated = Stream.of(aliasGroup)
+.skip(1) // skip target
+.filter(configs::containsKey)
+.collect(Collectors.toList());
+
+if (deprecated.isEmpty()) {
+// No deprecated key(s) found.
+if (configs.containsKey(target)) {
+newConfigs.put(target, configs.get(target));
+}
+return;
+}
+
+String aliasString = String.join(", ", deprecated);
+
+if (configs.containsKey(target)) {
+// Ignore the deprecated key(s) because the actual key was set.
+log.error(target + " was configured, as well as the deprecated 
alias(es) " +
+  aliasString + ".  Using the value of " + target);
+newConfigs.put(target, configs.get(target));
+} else if (deprecated.size() > 1) {
+log.error("The configuration keys " + aliasString + " are 
deprecated and may be " +

Review comment:
   you have to thank @cmccabe for that ;)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10571) Replace occurrences of blackout with backoff for KIP-629

2020-10-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-10571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xavier Léauté resolved KAFKA-10571.
---
Resolution: Fixed

> Replace occurrences of blackout with backoff for KIP-629
> 
>
> Key: KAFKA-10571
> URL: https://issues.apache.org/jira/browse/KAFKA-10571
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] huxihx opened a new pull request #9399: KAFKA-10584:IndexSearchType should use sealed trait instead of Enumeration

2020-10-08 Thread GitBox


huxihx opened a new pull request #9399:
URL: https://github.com/apache/kafka/pull/9399


   https://issues.apache.org/jira/browse/KAFKA-10584
   
   In Scala, we prefer sealed traits over Enumeration since the former gives 
you exhaustiveness checking. With Scala Enumeration, you don't get a warning if 
you add a new value that is not handled in a given pattern match.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10584) IndexSearchType should use sealed trait instead of Enumeration

2020-10-08 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-10584:
--

Assignee: huxihx

> IndexSearchType should use sealed trait instead of Enumeration
> --
>
> Key: KAFKA-10584
> URL: https://issues.apache.org/jira/browse/KAFKA-10584
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
>  Labels: newbie
>
> In Scala, we prefer sealed traits over Enumeration since the former gives you 
> exhaustiveness checking. With Scala Enumeration, you don't get a warning if 
> you add a new value that is not handled in a given pattern match.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-08 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502092508



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == "connection-accept-rate" &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey("ip")
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+
+ip match {
+  case Some(addr) =>
+val address = InetAddress.getByName(addr)
+if (maxConnectionRate.isDefined) {
+  info(s"Updating max connection rate override for $address to 
${maxConnectionRate.get}")
+  connectionRatePerIp.put(address, maxConnectionRate.get)
+} else {
+  info(s"Removing max connection rate override for $address")
+  connectionRatePerIp.remove(address)
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+val newQuota = maxConnectionRate.getOrElse(Int.MaxValue)

Review comment:
   actually in this case, it should reset to 
`DynamicConfig.Ip.DefaultConnectionCreationRate` right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-08 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502092158



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = 
TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+  val startThrottleTimeMs = time.milliseconds
+
+  val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
   I see. Yes, I think unrecording is more efficient than keeping more 
delayed connections than needed. Basically, when you unrecord from per-IP 
metric, you can also unrecord from broker and listener metric as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-08 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502091435



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = 
TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+  val startThrottleTimeMs = time.milliseconds
+
+  val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
   @apovzner 
   my reasoning for this is the following:
   consider the case where we accept a connection at the broker/listener level, 
but reject it on IP level.
   we would have already recorded the broker connection, so we'd be allocating 
rate to a rejected connection.
   
   I suppose this we can work around this in a similar manner to 
`recordIpConnectionMaybeThrottle` by unrecording the listener/broker connection 
if the IP gets rejected.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-08 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502091323



##
File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala
##
@@ -409,6 +409,67 @@ class ConnectionQuotasTest {
 verifyConnectionCountOnEveryListener(connectionQuotas, 
connectionsPerListener)
   }
 
+  @Test

Review comment:
   It would be useful to add a test where we have both per-listener and per 
IP limit, and verify that it throttles based on which limit is reached first. 
Something like: 2 IPs, each per IP limit < per-listener limit, but sum of per 
IP limits > listener limit. So, if you reach limit on one IP, the broker would 
not throttle the second IP until it reaches per listener limit. Does not have 
to be exactly this, just need to verify how per IP throttling interacts with 
per listener throttling. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap closed pull request #9398: MINOR update comments and docs to be gender-neutral

2020-10-08 Thread GitBox


gwenshap closed pull request #9398:
URL: https://github.com/apache/kafka/pull/9398


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-08 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502086777



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1242,7 +1314,56 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == "connection-accept-rate" &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey("ip")
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+
+ip match {
+  case Some(addr) =>
+val address = InetAddress.getByName(addr)
+if (maxConnectionRate.isDefined) {
+  info(s"Updating max connection rate override for $address to 
${maxConnectionRate.get}")
+  connectionRatePerIp.put(address, maxConnectionRate.get)
+} else {
+  info(s"Removing max connection rate override for $address")
+  connectionRatePerIp.remove(address)
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+val newQuota = maxConnectionRate.getOrElse(Int.MaxValue)

Review comment:
   You can use you new constant 
`DynamicConfig.Ip.UnlimitedConnectionCreationRate` instead of `Int.MaxValue` 
here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-10-08 Thread GitBox


apovzner commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r502084924



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1203,14 +1262,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private val listenerCounts = mutable.Map[ListenerName, Int]()
   private[network] val maxConnectionsPerListener = mutable.Map[ListenerName, 
ListenerConnectionQuota]()
   @volatile private var totalCount = 0
-
+  @volatile private var defaultConnectionRatePerIp = 
DynamicConfig.Ip.DefaultConnectionCreationRate
+  private val inactiveSensorExpirationTimeSeconds = 
TimeUnit.HOURS.toSeconds(1);
+  private val connectionRatePerIp = new ConcurrentHashMap[InetAddress, Int]()
+  private val lock = new ReentrantReadWriteLock()
+  private val sensorAccessor = new SensorAccess(lock, metrics)
   // sensor that tracks broker-wide connection creation rate and limit (quota)
-  private val brokerConnectionRateSensor = 
createConnectionRateQuotaSensor(config.maxConnectionCreationRate)
+  private val brokerConnectionRateSensor = 
getOrCreateConnectionRateQuotaSensor(config.maxConnectionCreationRate, 
BrokerQuotaEntity)
   private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds.toLong)
 
+
   def inc(listenerName: ListenerName, address: InetAddress, 
acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  waitForConnectionSlot(listenerName, acceptorBlockedPercentMeter)
+  val startThrottleTimeMs = time.milliseconds
+
+  val ipThrottleTimeMs = recordIpConnectionMaybeThrottle(address, 
startThrottleTimeMs)

Review comment:
   It would be more efficient if we throttled IPs **after** we know that we 
can accept a connection based on broker-wide and per-listener limits, since 
reaching broker/listener limits block the acceptor thread while throttling IPs 
needs more processing. Otherwise, if you reach both broker and per IP limit, 
the broker will continue accepting and delaying connections where it is 
justified to block an acceptor thread based on reaching a broker rate limit. 
Basically, call `waitForConnectionSlot` first. Similar how we check per IP 
limit on number of connections after we know that we can accept a new 
connection based on broker/listener limits. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on a change in pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629

2020-10-08 Thread GitBox


gwenshap commented on a change in pull request #9367:
URL: https://github.com/apache/kafka/pull/9367#discussion_r502083707



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConfigUtils {
+
+private static final Logger log = 
LoggerFactory.getLogger(ConfigUtils.class);
+
+/**
+ * Translates deprecated configurations into their non-deprecated 
equivalents
+ *
+ * @param configs the input configuration
+ * @param aliasGroups An array of arrays of synonyms.  Each synonym array 
begins with the non-deprecated synonym
+ *For example, new String[][] { { a, b }, { c, d, e} }
+ *would declare b as a deprecated synonym for a,
+ *and d and e as deprecated synonyms for c.
+ * @return a new configuration map with deprecated  keys translated to 
their non-deprecated equivalents
+ */
+public static  Map translateDeprecatedConfigs(Map 
configs, String[][] aliasGroups) {
+Set aliasSet = 
Stream.of(aliasGroups).flatMap(Stream::of).collect(Collectors.toSet());
+
+// pass through all configurations without aliases
+Map newConfigs = configs.entrySet().stream()
+.filter(e -> !aliasSet.contains(e.getKey()))
+// filter out null values
+.filter(e -> Objects.nonNull(e.getValue()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+Stream.of(aliasGroups).forEachOrdered(aliasGroup -> {
+String target = aliasGroup[0];
+
+List deprecated = Stream.of(aliasGroup)
+.skip(1) // skip target
+.filter(configs::containsKey)
+.collect(Collectors.toList());
+
+if (deprecated.isEmpty()) {
+// No deprecated key(s) found.
+if (configs.containsKey(target)) {
+newConfigs.put(target, configs.get(target));
+}
+return;
+}
+
+String aliasString = String.join(", ", deprecated);
+
+if (configs.containsKey(target)) {
+// Ignore the deprecated key(s) because the actual key was set.
+log.error(target + " was configured, as well as the deprecated 
alias(es) " +
+  aliasString + ".  Using the value of " + target);
+newConfigs.put(target, configs.get(target));
+} else if (deprecated.size() > 1) {
+log.error("The configuration keys " + aliasString + " are 
deprecated and may be " +

Review comment:
   I like the extra-detailed error messages, thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on a change in pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629

2020-10-08 Thread GitBox


gwenshap commented on a change in pull request #9367:
URL: https://github.com/apache/kafka/pull/9367#discussion_r502082826



##
File path: clients/src/main/java/org/apache/kafka/common/utils/ConfigUtils.java
##
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConfigUtils {
+
+private static final Logger log = 
LoggerFactory.getLogger(ConfigUtils.class);
+
+/**
+ * Translates deprecated configurations into their non-deprecated 
equivalents
+ *
+ * @param configs the input configuration
+ * @param aliasGroups An array of arrays of synonyms.  Each synonym array 
begins with the non-deprecated synonym
+ *For example, new String[][] { { a, b }, { c, d, e} }
+ *would declare b as a deprecated synonym for a,
+ *and d and e as deprecated synonyms for c.
+ * @return a new configuration map with deprecated  keys translated to 
their non-deprecated equivalents
+ */
+public static  Map translateDeprecatedConfigs(Map 
configs, String[][] aliasGroups) {
+Set aliasSet = 
Stream.of(aliasGroups).flatMap(Stream::of).collect(Collectors.toSet());
+
+// pass through all configurations without aliases
+Map newConfigs = configs.entrySet().stream()
+.filter(e -> !aliasSet.contains(e.getKey()))
+// filter out null values
+.filter(e -> Objects.nonNull(e.getValue()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+Stream.of(aliasGroups).forEachOrdered(aliasGroup -> {
+String target = aliasGroup[0];
+
+List deprecated = Stream.of(aliasGroup)

Review comment:
   We may be able to save all this hassle by defining alias group as a map 
of `target` to a list of `deprecated` configs? We defined this as a 2-dim array 
but we always convert it to lists...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r502078584



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -782,7 +782,7 @@ class LogTest {
 }
 
 // Retain snapshots for the last 2 segments
-ProducerStateManager.deleteSnapshotsBefore(logDir, 
segmentOffsets(segmentOffsets.size - 2))
+ProducerStateManager.listSnapshotFiles(logDir).filter(_.offset < 
segmentOffsets(segmentOffsets.size - 2)).foreach(_.deleteIfExists())

Review comment:
   Yes, it should work if we switch these back to using 
deleteSnapshotsBefore. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

2020-10-08 Thread GitBox


guozhangwang commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r502067124



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+/**
+ * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for 
users to test their {@link Processor},
+ * {@link Transformer}, and {@link ValueTransformer} implementations.
+ * 
+ * The tests for this class 
(org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
+ * tests that serve as example usage.
+ * 
+ * Note that this class does not take any automated actions (such as firing 
scheduled punctuators).
+ * It simply captures any data it witnesses.
+ * If you require more automated tests, we recommend wrapping your {@link 
Processor} in a minimal source-processor-sink
+ * {@link Topology} and using the {@link TopologyTestDriver}.
+ */
+public class MockProcessorContext implements 
ProcessorContext, RecordCollector.Supplier {

Review comment:
   Since we are adding a new class, could we have it extend 
InternalProcessorContext then, so that when we remove the old ones we can also 
cleanup the non-testing functions that branch on checking `instanceof 
InternalProcessorContext`: I'm assuming that `InternalProcessorContext` would 
stay in the end state, it would just extend `api.ProcessorContext` and not `StateStoreContext` in the future.

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language g

[GitHub] [kafka] xvrl opened a new pull request #9398: MINOR update comments and docs to be gender-neutral

2020-10-08 Thread GitBox


xvrl opened a new pull request #9398:
URL: https://github.com/apache/kafka/pull/9398


   cc @gwenshap 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap closed pull request #9366: KAFKA-10571 Replace blackout with backoff for KIP-629

2020-10-08 Thread GitBox


gwenshap closed pull request #9366:
URL: https://github.com/apache/kafka/pull/9366


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl commented on pull request #9367: KAFKA-10570 Rename JMXReporter configs for KIP-629

2020-10-08 Thread GitBox


xvrl commented on pull request #9367:
URL: https://github.com/apache/kafka/pull/9367#issuecomment-705863610


   jdk8 test failures appear unrelated



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r502052730



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
   Yeah, it should be `0` the first time you call it, then `1` the second 
time, and then back to `0` again on the third call





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2020-10-08 Thread Lauren McDonald (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210472#comment-17210472
 ] 

Lauren McDonald commented on KAFKA-7447:


We are on version 2.3.* and saw this issue multiple times. Up to you if you 
want to close...we put in some other manual fixes to get around it (like 
turning off auto leader rebalance, not ideal). 

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kaf

[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


junrao commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r502023899



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -782,7 +782,7 @@ class LogTest {
 }
 
 // Retain snapshots for the last 2 segments
-ProducerStateManager.deleteSnapshotsBefore(logDir, 
segmentOffsets(segmentOffsets.size - 2))
+ProducerStateManager.listSnapshotFiles(logDir).filter(_.offset < 
segmentOffsets(segmentOffsets.size - 2)).foreach(_.deleteIfExists())

Review comment:
   Since deleteSnapshotsBefore() still exist, could we keep using it?

##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -794,16 +794,12 @@ class LogTest {
 
 // Only delete snapshots before the base offset of the recovery point 
segment (post KAFKA-5829 behaviour) to
 // avoid reading all segments
-ProducerStateManager.deleteSnapshotsBefore(logDir, 
offsetForRecoveryPointSegment)
+ProducerStateManager.listSnapshotFiles(logDir).filter(_.offset < 
offsetForRecoveryPointSegment).foreach(_.deleteIfExists())

Review comment:
   Since deleteSnapshotsBefore() still exist, could we keep using it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2020-10-08 Thread James Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210470#comment-17210470
 ] 

James Cheng commented on KAFKA-7447:


Hi. The consensus seems to be that this was fixed in 
https://issues.apache.org/jira/browse/KAFKA-8896 

Should we update the Status, Resolution, and Fixed fields to correspond to 
https://issues.apache.org/jira/browse/KAFKA-8896 ?

 

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from _

[GitHub] [kafka] cmccabe commented on pull request #9390: MINOR: Implement ApiError#equals and hashCode

2020-10-08 Thread GitBox


cmccabe commented on pull request #9390:
URL: https://github.com/apache/kafka/pull/9390#issuecomment-705828611


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9390: MINOR: Implement ApiError#equals and hashCode

2020-10-08 Thread GitBox


cmccabe commented on pull request #9390:
URL: https://github.com/apache/kafka/pull/9390#issuecomment-705826094


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9628) Replace Produce request/response with automated protocol

2020-10-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210441#comment-17210441
 ] 

Mickael Maison commented on KAFKA-9628:
---

[~chia7712] I've reassigned it to you, go for it

> Replace Produce request/response with automated protocol
> 
>
> Key: KAFKA-9628
> URL: https://issues.apache.org/jira/browse/KAFKA-9628
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Chia-Ping Tsai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9628) Replace Produce request/response with automated protocol

2020-10-08 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-9628:
-

Assignee: Chia-Ping Tsai  (was: Mickael Maison)

> Replace Produce request/response with automated protocol
> 
>
> Key: KAFKA-9628
> URL: https://issues.apache.org/jira/browse/KAFKA-9628
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Chia-Ping Tsai
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501979388



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -1226,6 +1225,104 @@ class LogTest {
 assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
+  @Test
+  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+val log = createLog(logDir, logConfig)
+val pid1 = 1L
+val epoch = 0.toShort
+
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+
+log.updateHighWatermark(log.logEndOffset)
+
+assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+// Sleep to breach the retention period
+mockTime.sleep(1000 * 60 + 1)
+log.deleteOldSegments()
+// Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+mockTime.sleep(1)
+assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
+val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+val log = createLog(logDir, logConfig)
+val pid1 = 1L
+val epoch = 0.toShort
+
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+log.updateHighWatermark(log.logEndOffset)
+assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+// Increment the log start offset to exclude the first two segments.
+log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
ClientRecordDeletion)
+log.deleteOldSegments()
+// Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+mockTime.sleep(1)
+assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testCompactionDeletesProducerStateSnapshots(): Unit = {
+val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+val log = createLog(logDir, logConfig)
+val pid1 = 1L
+val epoch = 0.toShort
+val cleaner = new Cleaner(id = 0,
+  offsetMap = new FakeOffsetMap(Int.MaxValue),
+  ioBufferSize = 64 * 1024,
+  maxIoBufferSize = 64 * 1024,
+  dupBufferLoadFactor = 0.75,
+  throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = 
mockTime),
+  time = mockTime,
+  checkDone = _ => {})
+
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
+  producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
+  producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
+  producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+log.updateHighWatermark(log.logEndOffset)
+assertEquals("expected a snapshot file per segment base offset, except the 
first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+// Clean segments, this should delete everything except the active segment 
since there only
+// exists the key "a".
+cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
+log.deleteOldSegments()
+// Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+mockTime.sleep(1)
+assertEquals("expected a snapshot file per segment base offset, excluding 
the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
P

[GitHub] [kafka] gardnervickers commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#issuecomment-705786361


   @junrao I think that `Log.takeProducerSnapshot` is being used for testing in 
a few places in `LogTest`, though my IDE does not pick that up for some reason. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501970716



##
File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
##
@@ -834,6 +834,40 @@ class ProducerStateManagerTest {
 assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
+  @Test
+  def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = {
+// Test that when stray snapshots are removed, the largest stray snapshot 
is kept around. This covers the case where
+// the broker shutdown cleanly and emitted a snapshot file larger than the 
base offset of the active segment.
+
+// Create 3 snapshot files at different offsets.
+Log.producerSnapshotFile(logDir, 42).createNewFile()
+Log.producerSnapshotFile(logDir, 5).createNewFile()
+Log.producerSnapshotFile(logDir, 2).createNewFile()
+
+// claim that we only have one segment with a base offset of 5
+stateManager.removeStraySnapshots(Set(5))
+
+// The snapshot file at offset 2 should be considered a stray, but the 
snapshot at 42 should be kept
+// around because it is the largest snapshot.
+assertEquals(Some(42), stateManager.latestSnapshotOffset)
+assertEquals(Some(5), stateManager.oldestSnapshotOffset)
+assertEquals(Seq(5, 42), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testRemoveAllStraySnapshots(): Unit = {
+// Test that when stray snapshots are removed, all stray snapshots are 
removed when the base offset of the largest
+// segment exceeds the offset of the largest stray snapshot.

Review comment:
   Hmm, I think my comment here could be worded better. Offset `42` here is 
not a "stray", since we provide it along with the list of segmentBaseOffsets to 
`removeStraySnapshots`. 
   
   I'll change up the wording on this, thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501968249



##
File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
##
@@ -834,6 +834,40 @@ class ProducerStateManagerTest {
 assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
+  @Test
+  def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = {
+// Test that when stray snapshots are removed, the largest stray snapshot 
is kept around. This covers the case where
+// the broker shutdown cleanly and emitted a snapshot file larger than the 
base offset of the active segment.
+
+// Create 3 snapshot files at different offsets.
+Log.producerSnapshotFile(logDir, 42).createNewFile()
+Log.producerSnapshotFile(logDir, 5).createNewFile()
+Log.producerSnapshotFile(logDir, 2).createNewFile()
+
+// claim that we only have one segment with a base offset of 5
+stateManager.removeStraySnapshots(Set(5))
+
+// The snapshot file at offset 2 should be considered a stray, but the 
snapshot at 42 should be kept
+// around because it is the largest snapshot.
+assertEquals(Some(42), stateManager.latestSnapshotOffset)
+assertEquals(Some(5), stateManager.oldestSnapshotOffset)
+assertEquals(Seq(5, 42), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testRemoveAllStraySnapshots(): Unit = {
+// Test that when stray snapshots are removed, all stray snapshots are 
removed when the base offset of the largest
+// segment exceeds the offset of the largest stray snapshot.

Review comment:
   I think this sentence is a bit confusing. Snapshot 42 is not meant to be 
a stray snapshot here, only 5 and 2 are. I will try to reword this. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException

2020-10-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210410#comment-17210410
 ] 

Matthias J. Sax commented on KAFKA-7334:


[~vladimir_shadrin] I added you to the list of contributors and assigned the 
ticket to you. You can now also self-assign tickets. Please make sure you 
assign ticket to yourself when starting to work on them (to avoid that somebody 
else might work on it in parallel). Thank you!

> Suggest changing config for state.dir in case of FileNotFoundException
> --
>
> Key: KAFKA-7334
> URL: https://issues.apache.org/jira/browse/KAFKA-7334
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Vladimir
>Priority: Major
>  Labels: newbie
> Fix For: 2.7.0
>
>
> Quoting stack trace from KAFKA-5998 :
> {code}
> WARN [2018-08-22 03:17:03,745] 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager: task 
> [0_45] Failed to write offset checkpoint file to 
> /tmp/kafka-streams/
> {{ /0_45/.checkpoint: {}}}
> {{ ! java.nio.file.NoSuchFileException: 
> /tmp/kafka-streams//0_45/.checkpoint.tmp}}
> {{ ! at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}}
> {{ ! at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}}
> {code}
> When state.dir is left at default configuration, there is a chance that 
> certain files under the state directory are cleaned by OS since the default 
> dir starts with /tmp/kafka-streams.
> [~mjsax] and I proposed to suggest user, through exception message, to change 
> the location for state.dir .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7334) Suggest changing config for state.dir in case of FileNotFoundException

2020-10-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7334:
--

Assignee: Vladimir

> Suggest changing config for state.dir in case of FileNotFoundException
> --
>
> Key: KAFKA-7334
> URL: https://issues.apache.org/jira/browse/KAFKA-7334
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Assignee: Vladimir
>Priority: Major
>  Labels: newbie
> Fix For: 2.7.0
>
>
> Quoting stack trace from KAFKA-5998 :
> {code}
> WARN [2018-08-22 03:17:03,745] 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager: task 
> [0_45] Failed to write offset checkpoint file to 
> /tmp/kafka-streams/
> {{ /0_45/.checkpoint: {}}}
> {{ ! java.nio.file.NoSuchFileException: 
> /tmp/kafka-streams//0_45/.checkpoint.tmp}}
> {{ ! at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}}
> {{ ! at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}}
> {code}
> When state.dir is left at default configuration, there is a chance that 
> certain files under the state directory are cleaned by OS since the default 
> dir starts with /tmp/kafka-streams.
> [~mjsax] and I proposed to suggest user, through exception message, to change 
> the location for state.dir .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9380: KAFKA-7334: Suggest changing config for state.dir in case of FileNotF…

2020-10-08 Thread GitBox


mjsax commented on pull request #9380:
URL: https://github.com/apache/kafka/pull/9380#issuecomment-705772843


   Thanks for the PR @voffcheg109!
   
   Merged to `trunk` and cherry-picked to `2.7` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #9380: KAFKA-7334: Suggest changing config for state.dir in case of FileNotF…

2020-10-08 Thread GitBox


mjsax merged pull request #9380:
URL: https://github.com/apache/kafka/pull/9380


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501957800



##
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##
@@ -1226,6 +1225,104 @@ class LogTest {
 assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
 
+  @Test
+  def testRetentionDeletesProducerStateSnapshots(): Unit = {
+val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = 0, retentionMs = 1000 * 60, fileDeleteDelayMs = 0)
+val log = createLog(logDir, logConfig)
+val pid1 = 1L
+val epoch = 0.toShort
+
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+
+log.updateHighWatermark(log.logEndOffset)
+
+assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+// Sleep to breach the retention period
+mockTime.sleep(1000 * 60 + 1)
+log.deleteOldSegments()
+// Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+mockTime.sleep(1)
+assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
+val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
+val log = createLog(logDir, logConfig)
+val pid1 = 1L
+val epoch = 0.toShort
+
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
+  producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+log.updateHighWatermark(log.logEndOffset)
+assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+// Increment the log start offset to exclude the first two segments.
+log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
ClientRecordDeletion)
+log.deleteOldSegments()
+// Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+mockTime.sleep(1)
+assertEquals("expect a single producer state snapshot remaining", 1, 
ProducerStateManager.listSnapshotFiles(logDir).size)
+  }
+
+  @Test
+  def testCompactionDeletesProducerStateSnapshots(): Unit = {
+val logConfig = LogTest.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = LogConfig.Compact, fileDeleteDelayMs = 0)
+val log = createLog(logDir, logConfig)
+val pid1 = 1L
+val epoch = 0.toShort
+val cleaner = new Cleaner(id = 0,
+  offsetMap = new FakeOffsetMap(Int.MaxValue),
+  ioBufferSize = 64 * 1024,
+  maxIoBufferSize = 64 * 1024,
+  dupBufferLoadFactor = 0.75,
+  throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = 
mockTime),
+  time = mockTime,
+  checkDone = _ => {})
+
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
+  producerEpoch = epoch, sequence = 0), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
+  producerEpoch = epoch, sequence = 1), leaderEpoch = 0)
+log.roll()
+log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
+  producerEpoch = epoch, sequence = 2), leaderEpoch = 0)
+log.updateHighWatermark(log.logEndOffset)
+assertEquals("expected a snapshot file per segment base offset, except the 
first segment", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
+
+// Clean segments, this should delete everything except the active segment 
since there only
+// exists the key "a".
+cleaner.clean(LogToClean(log.topicPartition, log, 0, log.logEndOffset))
+log.deleteOldSegments()
+// Sleep to breach the file delete delay and run scheduled file deletion 
tasks
+mockTime.sleep(1)
+assertEquals("expected a snapshot file per segment base offset, excluding 
the first", log.logSegments.map(_.baseOffset).toSeq.sorted.drop(1), 
P

[GitHub] [kafka] lct45 commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


lct45 commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501956905



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
> goes back and forth between the two expected values 
   What do you mean by expected values? Toggling between the default of `0` and 
`1` after `partitionAssignor.subscriptionUserData` is called the first time?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
> goes back and forth between the two expected values 
   
   What do you mean by expected values? Toggling between the default of `0` and 
`1` after `partitionAssignor.subscriptionUserData` is called the first time?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] efeg commented on pull request #9397: KAFKA-10583: Add documentation on the thread-safety of KafkaAdminClient.

2020-10-08 Thread GitBox


efeg commented on pull request #9397:
URL: https://github.com/apache/kafka/pull/9397#issuecomment-705765230


   @cmccabe Do you think you might be able to take a look at this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] efeg opened a new pull request #9397: KAFKA-10583: Add documentation on the thread-safety of KafkaAdminClient.

2020-10-08 Thread GitBox


efeg opened a new pull request #9397:
URL: https://github.com/apache/kafka/pull/9397


   Other than a Stack Overflow comment (see 
https://stackoverflow.com/a/61738065) by Colin Patrick McCabe (@cmccabe ) and a 
proposed design note on KIP-117 wiki, there is no source that verifies the 
thread-safety of `KafkaAdminClient`.
   
   This patch updates JavaDoc of `KafkaAdminClient` to clarify its 
thread-safety.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501944185



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+  tm.put(f.offset, f)
+}
+tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. 
Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will 
be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in 
segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an 
associated segment file, but not to remove
+   */
+  private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = 
{
+var latestStraySnapshot: Option[SnapshotFile] = None
+val ss = loadSnapshots()
+for (snapshot <- ss.values().asScala) {
+  val key = snapshot.offset
+  latestStraySnapshot match {
+case Some(prev) =>
+  if (!segmentBaseOffsets.contains(key)) {
+// this snapshot is now the largest stray snapshot.
+prev.deleteIfExists()
+ss.remove(prev.offset)
+latestStraySnapshot = Some(snapshot)
+  }
+case None =>
+  if (!segmentBaseOffsets.contains(key)) {
+latestStraySnapshot = Some(snapshot)

Review comment:
   We perform a check below which may cover this case. After setting the 
`snapshots` map, we look at the latest snapshot in the map. If the latest 
snapshot in the map is not equal to the `latestStraySnapshot`, we delete the 
`latestStraySnapshot`. 
   
   I think this is a bit confusing though, so it might be better if instead we 
directly check that the `latestStraySnapshot` is larger than the largest offset 
in `segmentBaseOffsets`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


gardnervickers commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501942512



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -653,36 +697,44 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   def takeSnapshot(): Unit = {
 // If not a new offset, then it is not worth taking another snapshot
 if (lastMapOffset > lastSnapOffset) {
-  val snapshotFile = Log.producerSnapshotFile(logDir, lastMapOffset)
+  val snapshotFile = SnapshotFile(Log.producerSnapshotFile(_logDir, 
lastMapOffset))
   info(s"Writing producer snapshot at offset $lastMapOffset")
-  writeSnapshot(snapshotFile, producers)
+  writeSnapshot(snapshotFile.file, producers)
+  snapshots.put(snapshotFile.offset, snapshotFile)
 
   // Update the last snap offset according to the serialized map
   lastSnapOffset = lastMapOffset
 }
   }
 
+  /**
+   * Update the parentDir for this ProducerStateManager and all of the 
snapshot files which it manages.
+   */
+  def updateParentDir(parentDir: File): Unit ={
+_logDir = parentDir
+snapshots.forEach((_, s) => s.updateParentDir(parentDir))
+  }
+
   /**
* Get the last offset (exclusive) of the latest snapshot file.
*/
-  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(file => 
offsetFromFile(file))
+  def latestSnapshotOffset: Option[Long] = latestSnapshotFile.map(_.offset)
 
   /**
* Get the last offset (exclusive) of the oldest snapshot file.
*/
-  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => 
offsetFromFile(file))
+  def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(_.offset)
 
   /**
-   * When we remove the head of the log due to retention, we need to remove 
snapshots older than
-   * the new log start offset.
+   * Remove any unreplicated transactions lower than the provided 
logStartOffset and bring the lastMapOffset forward
+   * if necessary.
*/
-  def truncateHead(logStartOffset: Long): Unit = {
+  def onLogStartOffsetIncremented(logStartOffset: Long): Unit = {
 removeUnreplicatedTransactions(logStartOffset)
 
 if (lastMapOffset < logStartOffset)
   lastMapOffset = logStartOffset
 
-deleteSnapshotsBefore(logStartOffset)

Review comment:
   The idea here is to clear un-replicated transactions and optionally 
advance the `lastMapOffset` and `lastSnapOffset` when the logStartOffset is 
advanced, but to leave the snapshot files around. The corresponding snapshot 
files should be removed during the retention pass as we cleanup the associated 
segment files. 
   
   I was attempting to optimize incrementing the logStartOffset a bit so that 
we don't need to delete the snapshot files from the request handler thread when 
handling `DELETE_RECORDS`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210386#comment-17210386
 ] 

Matthias J. Sax commented on KAFKA-10585:
-

[~dongjin] – please assign the ticket to yourself if you plan to work on this.

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Rohan Desai
>Priority: Minor
>  Labels: newbie++
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10585:

Priority: Minor  (was: Major)

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Rohan Desai
>Priority: Minor
>  Labels: newbie++
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10585:

Issue Type: Improvement  (was: Bug)

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Rohan Desai
>Priority: Minor
>  Labels: newbie++
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10585:
--
Labels: newbie++  (was: )

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Rohan Desai
>Priority: Major
>  Labels: newbie++
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

2020-10-08 Thread GitBox


vvcephei commented on a change in pull request #9396:
URL: https://github.com/apache/kafka/pull/9396#discussion_r501891955



##
File path: checkstyle/suppressions.xml
##
@@ -194,13 +194,13 @@
   files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
 
 
+  
files="(EosBetaUpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest).java"/>

Review comment:
   This test violates both measures of complexity by virtue of the way it 
works: the test specifically includes a lot of loops so that it can generate 
every combination of store and store builder configuration to verify that 
everything works as expected.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##
@@ -162,4 +164,30 @@ public Headers headers() {
 public Record withHeaders(final Headers headers) {
 return new Record<>(key, value, timestamp, headers);
 }
+
+@Override
+public String toString() {

Review comment:
   added for quality-of-life debugging

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##
@@ -75,18 +75,25 @@ public String name() {
 @Deprecated
 @Override
 public void init(final ProcessorContext context, final StateStore root) {
-this.context = (InternalProcessorContext) context;
-
-final StreamsMetricsImpl metrics = this.context.metrics();
 final String threadId = Thread.currentThread().getName();
 final String taskName = context.taskId().toString();
-expiredRecordSensor = 
TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
-threadId,
-taskName,
-metricScope,
-name,
-metrics
-);
+
+// The provided context is not required to implement 
InternalProcessorContext,
+// If it doesn't, we can't record this metric.
+if (context instanceof InternalProcessorContext) {

Review comment:
   As a testament to `MockProcessorContextStateStoreTest`, it actually 
found this bug. I had overlooked this usage while making the other root stores 
context-implementation agnostic in the last PR.

##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
##
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ClientUtils;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.com

[GitHub] [kafka] junrao commented on pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


junrao commented on pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#issuecomment-705727027


   Also, it seems that Log.takeProducerSnapshot() no longer used?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10271) Performance regression while fetching a key from a single partition

2020-10-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10271.
---
Fix Version/s: (was: 2.5.2)
   2.7.0
   Resolution: Fixed

> Performance regression while fetching a key from a single partition
> ---
>
> Key: KAFKA-10271
> URL: https://issues.apache.org/jira/browse/KAFKA-10271
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0, 2.5.1
>Reporter: Dima R
>Assignee: Dima R
>Priority: Major
>  Labels: KAFKA-10030, KAFKA-9445, KIP-562
> Fix For: 2.7.0, 2.6.1
>
> Attachments: 9020.png
>
>
> This is follow-up bug for KAFKA-10030 
> StreamThreadStateStoreProvider excessive loop over calling 
> internalTopologyBuilder.topicGroups(), which is synchronized, thus causing 
> significant performance degradation to the caller, especially when store has 
> many partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-08 Thread GitBox


guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705723421


   Cherry-picked to 2.6 as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9396: KAFKA-10437: Implement new PAPI support for test-utils

2020-10-08 Thread GitBox


vvcephei opened a new pull request #9396:
URL: https://github.com/apache/kafka/pull/9396


   Implements KIP-478 for the test-utils module:
   * adds mocks of the new ProcessorContext and StateStoreContext
   * adds tests that all stores and store builders are usable with the new mock
   * adds tests that the new Processor api is usable with the new mock
   * updates the demonstration Processor to the new api
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-08 Thread GitBox


guozhangwang commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705707869


   Test passed, merged to trunk.
   
   Thanks @dima5rr for your great contribution!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-08 Thread GitBox


guozhangwang merged pull request #9020:
URL: https://github.com/apache/kafka/pull/9020


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao removed a comment on pull request #9393: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis

2020-10-08 Thread GitBox


junrao removed a comment on pull request #9393:
URL: https://github.com/apache/kafka/pull/9393#issuecomment-705702092


   In this PR, I have addressed the review comments from @chia7712 in #9001 
which were provided after #9001 was merged. The changes are made mainly to 
KafkaAdminClient:
   
   Improve error message in updateFeatures api when feature name is empty.
   Propagate top-level error message in updateFeatures api.
   Add an empty-parameter variety for describeFeatures api.
   Minor documentation updates to @param and @return to make these resemble 
other apis.
   
   Reviewers: Chia-Ping Tsai , Jun Rao 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501876646



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##
@@ -59,6 +60,7 @@
 );
 
 private final static String IGNORED_USER_ENDPOINT = 
"ignoredUserEndpoint:80";
+private static final byte[] IGNORED_UNIQUE_FIELD = Bytes.EMPTY;

Review comment:
   nit: let's use `new byte[1]` for this to make sure it's actually being 
ignored when it should be (since apparently it won't notice if you just pass in 
empty bytes for this field on a version < 8)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #9393: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis

2020-10-08 Thread GitBox


junrao merged pull request #9393:
URL: https://github.com/apache/kafka/pull/9393


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9393: KAFKA-10028: Minor fixes to describeFeatures and updateFeatures apis

2020-10-08 Thread GitBox


junrao commented on pull request #9393:
URL: https://github.com/apache/kafka/pull/9393#issuecomment-705702092


   In this PR, I have addressed the review comments from @chia7712 in #9001 
which were provided after #9001 was merged. The changes are made mainly to 
KafkaAdminClient:
   
   Improve error message in updateFeatures api when feature name is empty.
   Propagate top-level error message in updateFeatures api.
   Add an empty-parameter variety for describeFeatures api.
   Minor documentation updates to @param and @return to make these resemble 
other apis.
   
   Reviewers: Chia-Ping Tsai , Jun Rao 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501875691



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
##
@@ -286,7 +286,8 @@ private static Properties streamsProperties(final String 
appId,
 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
"6"),
 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, 
configuredAssignmentListener),
 mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
-
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
HighAvailabilityTaskAssignor.class.getName())
+
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
HighAvailabilityTaskAssignor.class.getName()),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)

Review comment:
   Can we add a comment here explaining why we set the thread count so 
high? I feel like we'll forget and be really confused when we stumble across 
this in the future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ivanyu opened a new pull request #9395: KAFKA-9726: Add LegacyReplicationPolicy for MM2

2020-10-08 Thread GitBox


ivanyu opened a new pull request #9395:
URL: https://github.com/apache/kafka/pull/9395


   This commit adds a new replication policy for MirrorMaker 2, 
`LegacyReplicationPolicy`. This policy imitates MirrorMaker 1 behavior of not 
renaming replicated topics. The exception is made for `heartbeats` topic, that 
is replicated according to `DefaultReplicationPolicy`.
   
   Avoiding renaming topics brings a number of limitations, among which the 
most important one is the impossibility of detecting replication cycles. This 
makes cross-replication using `LegacyReplicationPolicy` effectively impossible. 
See `LegacyReplicationPolicy` Javadoc for details.
   
   A new method `canTrackSource` is added to `ReplicationPolicy`. Its result 
indicates if the replication policy can track back to the source topic of a 
topic. It is needed to allow detecting target topics work when 
`LegacyReplicationPolicy` is used.
   
   On the testing side, the tests have the same strategy as for 
`DefaultReplicationPolicy` with nicessary adjustments (e.g. no active/active 
replication is tested).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r501873791



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = throwable -> 
handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {
+if (streamsUncaughtExceptionHandler != null)  {

Review comment:
   I agree with Bruno, I don't see why anyone would want to reset the 
exception handler (why set it in the first place then?). But if for some reason 
they really did set it at some point in their code and then later on want to 
revert to the default behavior, they can just pass in a handler that returns 
`SHUTDOWN_STREAMTHREAD_THREAD` themselves. If that's really what they want, 
then they should specify it.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r501868835



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = throwable -> 
handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {

Review comment:
I think users may find at least some of the same functionality to be 
useful, in particular the "shutdown client" and "shutdown application" enums. I 
also feel like users may want to be able to restart the global thread.  I think 
we can consider that out of scope for now, but I'd prefer to avoid introducing 
a new method that just accepts the old kind of uncaught exception handler if 
we're just going to deprecate that too.
   
   But I do agree that it's pretty different, and we also don't want to commit 
to adding this functionality for the global thread right away so we should make 
sure it's clear what is and isn't implemented. So what do you think about 
mirroring the new handler for the StreamThread, eg adding a 
"GlobalUncaughtExceptionHandler", and just only including the `SHUTDOWN_THREAD` 
enum in the initial version? That way we're set up to easily extend the global 
handling feature without necessarily needing to implement it right away





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #7929: KAFKA-9393: DeleteRecords may cause extreme lock contention for large partition directories

2020-10-08 Thread GitBox


junrao commented on a change in pull request #7929:
URL: https://github.com/apache/kafka/pull/7929#discussion_r501325705



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {

Review comment:
   ProducerStateManager.listSnapshotFiles() could just be 
listSnapshotFiles() ?

##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+  tm.put(f.offset, f)
+}
+tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. 
Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will 
be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in 
segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an 
associated segment file, but not to remove

Review comment:
   Incomplete sentence after "but not to remove".

##
File path: core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
##
@@ -834,6 +834,40 @@ class ProducerStateManagerTest {
 assertEquals(None, 
stateManager.lastEntry(producerId).get.currentTxnFirstOffset)
   }
 
+  @Test
+  def testRemoveStraySnapshotsKeepCleanShutdownSnapshot(): Unit = {
+// Test that when stray snapshots are removed, the largest stray snapshot 
is kept around. This covers the case where
+// the broker shutdown cleanly and emitted a snapshot file larger than the 
base offset of the active segment.
+
+// Create 3 snapshot files at different offsets.
+Log.producerSnapshotFile(logDir, 42).createNewFile()
+Log.producerSnapshotFile(logDir, 5).createNewFile()
+Log.producerSnapshotFile(logDir, 2).createNewFile()
+
+// claim that we only have one segment with a base offset of 5
+stateManager.removeStraySnapshots(Set(5))
+
+// The snapshot file at offset 2 should be considered a stray, but the 
snapshot at 42 should be kept
+// around because it is the largest snapshot.
+assertEquals(Some(42), stateManager.latestSnapshotOffset)
+assertEquals(Some(5), stateManager.oldestSnapshotOffset)
+assertEquals(Seq(5, 42), 
ProducerStateManager.listSnapshotFiles(logDir).map(_.offset).sorted)
+  }
+
+  @Test
+  def testRemoveAllStraySnapshots(): Unit = {
+// Test that when stray snapshots are removed, all stray snapshots are 
removed when the base offset of the largest
+// segment exceeds the offset of the largest stray snapshot.

Review comment:
   Below, the base offset of the largest segment equals to and doesn't 
exceed the offset of the largest stray snapshot.

##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -496,6 +491,53 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   // completed transactions whose markers are at offsets above the high 
watermark
   private val unreplicatedTxns = new util.TreeMap[Long, TxnMetadata]
 
+  /**
+   * Load producer state snapshots by scanning the _logDir.
+   */
+  private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, 
SnapshotFile] = {
+val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+for (f <- ProducerStateManager.listSnapshotFiles(_logDir)) {
+  tm.put(f.offset, f)
+}
+tm
+  }
+
+  /**
+   * Scans the log directory, gathering all producer state snapshot files. 
Snapshot files which do not have an offset
+   * corresponding to one of the provided offsets in segmentBaseOffsets will 
be removed, except in the case that there
+   * is a snapshot file at a higher offset than any offset in 
segmentBaseOffsets.
+   *
+   * The goal here is to remove any snapshot files which do not have an 
associated segment file, but not to remove
+   */
+  private[log] def removeStraySnapshots(segmentBaseOffsets: Set[Long]): Unit = 
{
+var lates

[GitHub] [kafka] ableegoldman commented on pull request #9380: KAFKA-7334: Suggest changing config for state.dir in case of FileNotF…

2020-10-08 Thread GitBox


ableegoldman commented on pull request #9380:
URL: https://github.com/apache/kafka/pull/9380#issuecomment-705689770


   Tests passed, should be good to merge @mjsax . Btw the 2.7 branch was just 
cut so this should be cherrypicked back to 2.7.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


lct45 commented on pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#issuecomment-705688523


   System tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4204/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10589) Rename kafka-replica-verification CLI command line arguments for KIP-629

2020-10-08 Thread Jira
Xavier Léauté created KAFKA-10589:
-

 Summary: Rename kafka-replica-verification CLI command line 
arguments for KIP-629
 Key: KAFKA-10589
 URL: https://issues.apache.org/jira/browse/KAFKA-10589
 Project: Kafka
  Issue Type: Sub-task
Reporter: Xavier Léauté






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10588) Rename kafka-console-consumer CLI command line arguments for KIP-629

2020-10-08 Thread Jira
Xavier Léauté created KAFKA-10588:
-

 Summary: Rename kafka-console-consumer CLI command line arguments 
for KIP-629
 Key: KAFKA-10588
 URL: https://issues.apache.org/jira/browse/KAFKA-10588
 Project: Kafka
  Issue Type: Sub-task
Reporter: Xavier Léauté






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10587) Rename kafka-mirror-maker CLI command line arguments for KIP-629

2020-10-08 Thread Jira
Xavier Léauté created KAFKA-10587:
-

 Summary: Rename kafka-mirror-maker CLI command line arguments for 
KIP-629
 Key: KAFKA-10587
 URL: https://issues.apache.org/jira/browse/KAFKA-10587
 Project: Kafka
  Issue Type: Sub-task
Reporter: Xavier Léauté






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9679) Mock consumer should behave consistent with actual consumer

2020-10-08 Thread Sheikh Araf (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sheikh Araf updated KAFKA-9679:
---
Attachment: (was: image.png)

> Mock consumer should behave consistent with actual consumer
> ---
>
> Key: KAFKA-9679
> URL: https://issues.apache.org/jira/browse/KAFKA-9679
> Project: Kafka
>  Issue Type: Test
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: help-wanted, newbie, newbie++
>
> Right now in MockConsumer we shall return illegal state exception when the 
> buffered records are not able to find corresponding assigned partitions. This 
> is not the case for KafkaConsumer where we shall just not return those data 
> during `poll()` call. This inconsistent behavior should be fixed.
> Note that if we are going to take this fix, the full unit tests need to be 
> executed to make sure no regression is introduced, as some tests are 
> potentially depending on the current MockConsumer behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9679) Mock consumer should behave consistent with actual consumer

2020-10-08 Thread Sheikh Araf (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sheikh Araf updated KAFKA-9679:
---
Attachment: image.png

> Mock consumer should behave consistent with actual consumer
> ---
>
> Key: KAFKA-9679
> URL: https://issues.apache.org/jira/browse/KAFKA-9679
> Project: Kafka
>  Issue Type: Test
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: help-wanted, newbie, newbie++
>
> Right now in MockConsumer we shall return illegal state exception when the 
> buffered records are not able to find corresponding assigned partitions. This 
> is not the case for KafkaConsumer where we shall just not return those data 
> during `poll()` call. This inconsistent behavior should be fixed.
> Note that if we are going to take this fix, the full unit tests need to be 
> executed to make sure no regression is introduced, as some tests are 
> potentially depending on the current MockConsumer behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] xvrl commented on pull request #9366: KAFKA-10571 Replace blackout with backoff for KIP-629

2020-10-08 Thread GitBox


xvrl commented on pull request #9366:
URL: https://github.com/apache/kafka/pull/9366#issuecomment-705676711


   CI seems broken, can we trigger tests again for this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xvrl commented on pull request #9366: KAFKA-10571 Replace blackout with backoff for KIP-629

2020-10-08 Thread GitBox


xvrl commented on pull request #9366:
URL: https://github.com/apache/kafka/pull/9366#issuecomment-705676790


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr removed a comment on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-08 Thread GitBox


dima5rr removed a comment on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Rohan Desai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210308#comment-17210308
 ] 

Rohan Desai commented on KAFKA-10585:
-

Yes, this is a good summary.

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Rohan Desai
>Priority: Major
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dima5rr commented on pull request #9020: KAFKA-10271 Performance regression while fetching a key from a single partition

2020-10-08 Thread GitBox


dima5rr commented on pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#issuecomment-705675283


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-08 Thread GitBox


cadonna commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r501802252



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -617,7 +619,19 @@ public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
 final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
 streams.start();
 try {
-streams.setUncaughtExceptionHandler(null);
+
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);
+fail("Should throw IllegalStateException");
+} catch (final IllegalStateException e) {
+// expected
+}
+}

Review comment:
   Sorry, my bad! It doesn't matter whether it is called or not, since the 
`IllegalStateException` is thrown.

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -617,7 +619,19 @@ public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState(
 final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
 streams.start();
 try {
-streams.setUncaughtExceptionHandler(null);
+
streams.setUncaughtExceptionHandler((Thread.UncaughtExceptionHandler) null);
+fail("Should throw IllegalStateException");
+} catch (final IllegalStateException e) {
+// expected
+}
+}
+
+@Test
+public void 
shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() {

Review comment:
   Actually, there are two tests missing. With the test you added, you test 
the `else` branch on line 400 in `KafkaStreams`.  But you do not test the 
`then` branch of the same `if` statement and within the `then` branch there are 
again a `then` branch and a `else` branch to test. 
   If you do without the inner `else` branch as I proposed in another comment, 
you need to test the exception that is thrown when `null` is passed to the 
method. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9273: KAFKA-9331: changes for Streams uncaught exception handler

2020-10-08 Thread GitBox


cadonna commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r501793433



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -364,6 +370,73 @@ public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = throwable -> 
handleStreamsUncaughtException(throwable, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+for (final StreamThread thread : threads) {
+if (streamsUncaughtExceptionHandler != null)  {

Review comment:
   I do not think we need to emulate the behavior of the java uncaught 
exception handler here. I do not see why a user should try to reset the 
uncaught exception handler. If we receive this requirement, we can still add 
it. I have the impression the "reset" makes this code unnecessarily complex.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #9271: MINOR: correct package of LinuxIoMetricsCollector

2020-10-08 Thread GitBox


mimaison merged pull request #9271:
URL: https://github.com/apache/kafka/pull/9271


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector

2020-10-08 Thread GitBox


mimaison commented on pull request #9271:
URL: https://github.com/apache/kafka/pull/9271#issuecomment-705628291


   Test failures are not related and passed locally, merging



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] acmck closed pull request #9394: Fully automate dev setup with Gitpod

2020-10-08 Thread GitBox


acmck closed pull request #9394:
URL: https://github.com/apache/kafka/pull/9394


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] acmck opened a new pull request #9394: Fully automate dev setup with Gitpod

2020-10-08 Thread GitBox


acmck opened a new pull request #9394:
URL: https://github.com/apache/kafka/pull/9394


   This commit implements a fully-automated development setup using Gitpod.io, 
an
   online IDE for GitLab, GitHub, and Bitbucket that enables 
Dev-Environments-As-Code.
   This makes it easy for anyone to get a ready-to-code workspace for any 
branch,
   issue or pull request almost instantly with a single click.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10585) Kafka Streams should clean up the state store directory from cleanup

2020-10-08 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17210251#comment-17210251
 ] 

Dongjin Lee commented on KAFKA-10585:
-

I inspected this issue a little bit. You mean, all 
`\{state.dir}/\{application-id}/*` are deleted but 
`\{state.dir}/\{application-id}` itself does not deleted when cleanup, 
occupying filesystem resources.

Do I understand correctly?

> Kafka Streams should clean up the state store directory from cleanup
> 
>
> Key: KAFKA-10585
> URL: https://issues.apache.org/jira/browse/KAFKA-10585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Rohan Desai
>Priority: Major
>
> Currently, `KafkaStreams.cleanup` cleans up all the task-level directories 
> and the global directory. However it doesn't clean up the enclosing state 
> store directory, though streams does create this directory when it 
> initializes the state for the streams app. Feels like it should remove this 
> directory when it cleans up.
> We notice this in ksql quite often, since every new query is a new streams 
> app. Over time, we see lots of state store directories left around for old 
> queries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10586) MirrorMaker 2.0 REST support

2020-10-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-10586:


 Summary: MirrorMaker 2.0 REST support
 Key: KAFKA-10586
 URL: https://issues.apache.org/jira/browse/KAFKA-10586
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban
Assignee: Daniel Urban


KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that 
with specific workloads, the dedicated MM2 cluster can become unable to react 
to dynamic topic and group filter changes.

(This occurs when after a rebalance operation, the leader node has no 
MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
stopped on the leader, meaning it cannot detect config changes by itself. 
Followers still running the connector can detect config changes, but they 
cannot query the leader for config updates.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison merged pull request #9296: MINOR: remove unused scala files from core module

2020-10-08 Thread GitBox


mimaison merged pull request #9296:
URL: https://github.com/apache/kafka/pull/9296


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >