[GitHub] [kafka] github-actions[bot] commented on pull request #12806: KAFKA-14345: Fix flakiness with more accurate bound in (Dynamic)ConnectionQuotaTest

2023-07-14 Thread via GitHub


github-actions[bot] commented on PR #12806:
URL: https://github.com/apache/kafka/pull/12806#issuecomment-1636643207

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurrs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13703: MINOR: Standardize controller log4j output for replaying records

2023-07-14 Thread via GitHub


ijuma commented on PR #13703:
URL: https://github.com/apache/kafka/pull/13703#issuecomment-1636638363

   +1 @jolshan - is there a reason why people are not checking the build 
result? Particularly when it doesn't even compile.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14483) Move LocalLog to storage module

2023-07-14 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-14483:
--

Assignee: Satish Duggana

> Move LocalLog to storage module
> ---
>
> Key: KAFKA-14483
> URL: https://issues.apache.org/jira/browse/KAFKA-14483
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] bmscomp opened a new pull request, #14022: MINOR: Fix typo in java documentation

2023-07-14 Thread via GitHub


bmscomp opened a new pull request, #14022:
URL: https://github.com/apache/kafka/pull/14022

   This is a very minimal pull request, that consist of fixing a too minima  
typo in javadoc
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp opened a new pull request, #14021: MINOR: remove unused variable in examples

2023-07-14 Thread via GitHub


bmscomp opened a new pull request, #14021:
URL: https://github.com/apache/kafka/pull/14021

   this is a too minor pull request, and it consists of removing  unused 
variables in examples 
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [] Verify test coverage and CI build status
   - [ x Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on PR #13971:
URL: https://github.com/apache/kafka/pull/13971#issuecomment-1636560477

   This appears to have some JDK8 failures that look consistent that i'll have 
to replicate locally. I've been testing with JDK11, so it's possible that the 
JDK8 ServiceLoader has some slightly different behavior, since it was 
overhauled in JDK9.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #14020: KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure

2023-07-14 Thread via GitHub


gharris1727 opened a new pull request, #14020:
URL: https://github.com/apache/kafka/pull/14020

   ## Summary
   
   The Reflections library has a race condition that causes it to sometimes 
throw an NPE. This NPE crashes the connect worker on startup, both in live 
environments and in tests, and causes those tests to be flaky. This PR uses 
reflection to patch the library and eliminate the race condition. This is done 
instead of upstreaming the patch or forking the library because the library 
itself is unmaintained and should be phased out.
   
   ## Background
   
   The Reflections library makes use of a data structure `Store` to store the 
results of scanning for later querying. The scanner writes to the store during 
`Reflections#scan()` via the `SubTypesScanner`. The store is later queried by 
`Reflections#getSubTypesOf`.
   
   Due to the slow speed of reflectively discovering all classes on the 
classpath and plugin.path, the Reflections library is used with a parallel 
executor, increasing the scanning speed. Unfortunately the parallel mode of the 
library has some bugs, one of which have already been patched via the 
InternalReflections subclass.
   
   The parallel mode causes the Store to receive concurrent writes. The javadoc 
for the class does not specify that it is or isn't thread-safe, but due to the 
use of ConcurrentHashMap and the support for parallel scanning in Reflections, 
the class seems intended to be thread-safe.
   
   ## Symptoms
   
   The failure appears as the following stack trace.
   ```
   java.lang.NullPointerException
   at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
   at org.reflections.Store.getAllIncluding(Store.java:82)
   at org.reflections.Store.getAll(Store.java:93)
   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
   at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
   at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
   ```
   The stack trace refers to the old location of this code, but the failure 
persists in it's new location. The line numbers inside of the Reflections 
library are still accurate, since we haven't upgraded the version of this 
library in several years.
   
   ## Diagnosis
   
   From the stacktrace, the NPE is caused by the argument of 
`ConcurrentHashMap#get(String)` being null. Tracing backwards, this value is 
ultimately read from `Store#storeMap`, meaning it contains a null in the 
innermost Collection. Also, it contains this null after all of the concurrent 
scanning has finished (see `Reflections#scan` where it waits for all of the 
submitted futures) so there are no writes racing with the reads. **The 
`Store#storeMap` contains a null at the end of scanning**.
   
   Analyzing the data flow into the `Store#storeMap`, we can see that there is 
only one method which writes to it: `Store#put(String, String, String)`, where 
the last argument is added into the innermost collection. This method is called 
in various places: 
   * `Reflections#expandSuperTypes` always non-null, and access is 
single-threaded
   * `Store#merge` (not used) null iff already null in the other Store, and 
access is single-threaded
   * `XmlSerializer#read` (not used) null iff null in the XML file, and access 
is single-threaded
   * `AbstractScanner#put` always non-null, **but the method is called 
concurrently**
   
   Because the argument appears to be non-null on all active code-paths, it 
doesn't appear that the null could be coming from a caller. Because there are 
no null values upon entering the method, and there are when exiting, I believe 
the method itself must be introducing a null. The only interesting property of 
this method is that it is called concurrently, so there could be a concurrency 
bug.
   
   Following the hypothesis that this is due to concurrency, I looked for 
potentially non-thread-safe stuff in this method implementation:
   ```
   public boolean put(String index, String key, String value) {
   return storeMap.computeIfAbsent(index, s -> new 
ConcurrentHashMap<>())
   .computeIfAbsent(key, s -> new ArrayList<>())
   .add(value);
   }
   ```
   
   The innermost Collection turns out to be a non-thread-safe ArrayList 
instance. From the Arr

[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1264232306


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1076,10 +1073,10 @@ public void handleLeaderChange(LeaderAndEpoch 
newLeader) {
 renounce();
 }
 } else if (newLeader.isLeader(nodeId)) {
-log.info("Becoming the active controller at epoch {}, 
committed offset {}, " +
-"committed epoch {}", newLeader.epoch(), 
lastCommittedOffset,
-lastCommittedEpoch);
-claim(newLeader.epoch());
+long newLastWriteOffset = endOffset - 1;

Review Comment:
   The endOffset comes directly from the log and is as described... the end 
offset (exclusive).
   
   Thinking about it more, I don't think I need to assume that it's committed, 
so I won't.
   
   But it is used to calculate the next offset that the active controller 
should try to write to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15190:
---
Labels: needs-kip  (was: kip)

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-15190:
---
Labels: kip  (was: )

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>  Labels: kip
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743313#comment-17743313
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


I would be a bit hesitant to overly rely on the group.instance.id because not 
everyone uses or wants static membership, and that config is completely coupled 
to the feature. Perhaps we can reuse the group.instance.id as the process id 
only if/when static membership is already being used, which would not 
necessarily even require a KIP (maybe), but we'd still need to introduce a new 
config for the general use case.

It's a bummer because of course, practically speaking, this new config would 
have exactly the same meaning as the group.instance.id – a unique, persistent 
identifier for each client. It would have been the perfect config for this use 
case if not for Kafka's habit of being overly clever about reusing configs to 
enable/disable the related feature, in addition to their actual usage.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan merged pull request #13787: KAFKA-14884: Include check transaction is still ongoing right before append (take 2)

2023-07-14 Thread via GitHub


jolshan merged PR #13787:
URL: https://github.com/apache/kafka/pull/13787


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743311#comment-17743311
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15190:


I'm all for this, though it will need a KIP. Would you be interested in writing 
one? Happy to help you with the process if so.

As for the meantime, perhaps you guys can get some relief by just writing this 
processId directly upon setup, before starting the Streams app? I believe it 
just expects a plain UUID at the moment, so you should be able to write a 
function that hashes this container id to something of that form and then 
persist it to disk in exactly the same way as Streams.

> Allow configuring a streams process ID
> --
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Joe Wreschnig
>Priority: Major
>
> We run our Kafka Streams applications in containers with no persistent 
> storage, and therefore the mitigation of persisting process ID the state 
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during 
> restarts.
> However, we do have a persistent container ID (from a Kubernetes 
> StatefulSet). Would it be possible to expose a configuration option to let us 
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense 
> to have the process ID be automatically derived from this (plus 
> application/client IDs) if it's set? The two IDs seem to have overlapping 
> goals of identifying "this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15192) Network thread receives exception when updating request metrics

2023-07-14 Thread David Mao (Jira)
David Mao created KAFKA-15192:
-

 Summary: Network thread receives exception when updating request 
metrics
 Key: KAFKA-15192
 URL: https://issues.apache.org/jira/browse/KAFKA-15192
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Reporter: David Mao


We noticed an exception being thrown from the network threads when updating 
some of the request histograms. Example stack trace:

 
java.util.NoSuchElementException
at 
java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:2064)
at 
com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:102)
at 
com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:81)
at com.yammer.metrics.core.Histogram.update(Histogram.java:110)
 

Searching the error I found a similar ticket resolved in Cassandra by updating 
their dropwizard dependency to pull in 
[https://github.com/dropwizard/metrics/pull/1436]. 
https://issues.apache.org/jira/browse/CASSANDRA-15472

Kafka currently still uses yammer metrics, so we would need to take 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-510%3A+Metrics+library+upgrade]
 forward to upgrade to a dropwizard version that fixes this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mumrah commented on pull request #14008: MINOR Improve logging during the ZK to KRaft migration

2023-07-14 Thread via GitHub


mumrah commented on PR #14008:
URL: https://github.com/apache/kafka/pull/14008#issuecomment-1636474239

   Test failures seem to be unrelated flaky
   
   https://github.com/apache/kafka/assets/55116/6d6dc32c-e6da-4e43-9ef6-9c6278ccce78";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a diff in pull request #13993: KAFKA-15178: Improve ConsumerCoordinator.poll perf

2023-07-14 Thread via GitHub


ableegoldman commented on code in PR #13993:
URL: https://github.com/apache/kafka/pull/13993#discussion_r1264175800


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -119,6 +119,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 private Set joinedSubscription;
 private MetadataSnapshot metadataSnapshot;
 private MetadataSnapshot assignmentSnapshot;
+private boolean metadataUpdated;

Review Comment:
   I think I would personally need to see benchmarks (somehow) to be convinced 
that just changing it to a `LinkedHashSet` is sufficient as an optimization, or 
even just guaranteed not to be a regression. Speaking from experience, I once 
tried to optimize the cache in Streams for better range-scan performance which 
involved swapping out the underlying data structure for a `LinkedHashMap`. The 
consequences were pretty dire, with the `LinkedHashMap` having comparably 
terrible performance characteristics when scaling up the keyspace. Granted it's 
not the exact same scenario because that was a map and also the Concurrent 
variant which surely means the scaling characteristics are not going to be 
exactly the same. But it has made me incredibly suspicious of the 
`LinkedHashXXX` data structures for use cases like this.
   
   All that said, if this is literally only used for an equality check and 
nothing else, why not just hash it or something like that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264075402


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   I looked more into how the Reflections library handles this, and it actually 
just WARN logs these classes and never shows them to us, so we don't even get 
the opportunity to log the error ourselves:
   ```
   [2023-07-14 11:54:26,418] WARN could not get type for name 
test.plugins.MissingSuperclassConverter from any class loader 
(org.reflections.Reflections:318)
   org.reflections.ReflectionsException: could not get type for name 
test.plugins.MissingSuperclassConverter
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at 
org.reflections.ReflectionUtils.lambda$forNames$22(ReflectionUtils.java:330)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:332)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.getPluginDesc(ReflectionScanner.java:118)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.scanPlugins(ReflectionScanner.java:91)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:78)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:66)
   Caused by: java.lang.NoClassDefFoundError: test/plugins/NonExis

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264131073


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   Oh and just to motivate the tweaks I made to your implementation:
   If the ServiceLoader implementation can skip over some LinkageErrors, I 
think it's possible to have two different plugins throw the same exception and 
accidentally trigger the equals condition.
   
   For example, if two plugins implemented NonExistentInterface and appeared 
consecutively in the ServiceLoader manifest, I think the logic should show at 
least one of the errors, but not fail the worker or shadow other plugins. I 
think this would typically happen if a plugin packages both a Source and Sink 
that both have the same missing dependency, but the ServiceLoader 
implementation could continue to make progress.
   
   There's a heuristic for 100 consecutive failures: if you package 100 
consecutive faulty plugins (or have them all on the classpath) then it falls 
back to failing the worker. I don't think i've seen more than ~5-10 connectors, 
and ~20-30 Transforms packaged together. If the errors are non-consecutive then 
the counter resets as well, so the actual number of tolerated hasNext calls 
could be quite large, covering most use-cases.
   
   The heuristic is just there to prevent infinite loops in the 
ServiceLoader-not-making-progress case. If the exception message happens to 
contain a memory address, or the exception alternates between two different 
messages, we should still fail the worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14009: MINOR: Add dual write offset metric

2023-07-14 Thread via GitHub


mumrah commented on code in PR #14009:
URL: https://github.com/apache/kafka/pull/14009#discussion_r1264114923


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -109,6 +113,18 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
+
+if (zkMigrationState) {
+registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new 
Gauge() {
+@Override
+public Long value() {
+// not in dual-write mode: set metric value to 0
+if (dualWriteOffset() == 0) return 0L;
+// in dual write mode
+return lastCommittedRecordOffset() - dualWriteOffset();

Review Comment:
   style nit: let's just do an if and else here rather than the fall-through 



##
metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java:
##
@@ -57,11 +57,37 @@ public void testMetricNames() {
 }
 }
 
+@Test
+public void testMetricNamesInMigrationState() {
+MetricsRegistry registry = new MetricsRegistry();
+MockTime time = new MockTime();
+try {
+try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, true)) {
+ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
+new HashSet<>(Arrays.asList(
+
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
+
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+
"kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
+)));
+}

Review Comment:
   nit: too much whitespace. each indentation level should be 4 spaces (no tabs)



##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##
@@ -214,8 +233,9 @@ public void testOnlySendNeededRPCsToBrokers() throws 
Exception {
 metadataPublisher -> { },
 new MockFaultHandler("test"),
 quorumFeatures,
+metrics,
 mockTime
-)) {
+)) {

Review Comment:
   nit: looks like your IDE maybe reformatted some things. can you revert the 
whitespace and formatting changes?



##
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##
@@ -344,8 +365,8 @@ public void 
testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate()
 },
 new MockFaultHandler("test"),
 quorumFeatures,
-mockTime
-)) {
+metrics,
+mockTime)) {

Review Comment:
   nit: let's keep the style like:
   
   ```
   foo(
   
   ) {
   
   }
   ```
   
   so in this case, the `)) {` should stay on its own line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264131073


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   Oh and just to motivate the tweaks I made to your implementation:
   If the ServiceLoader implementation can skip over some LinkageErrors, I 
think it's possible to have two different plugins throw the same exception and 
accidentally trigger the equals condition.
   
   For example, if two plugins implemented NonExistentInterface and appeared 
consecutively in the ServiceLoader manifest, I think the logic should show at 
least one of the errors, but not fail the worker or shadow other plugins. I 
think this would typically happen if a plugin packages both a Source and Sink 
that both have the same missing dependency, but the ServiceLoader 
implementation could continue to make progress.
   
   There's a heuristic for 100 consecutive failures: if you package 100 
consecutive faulty plugins (or have them all on the classpath) then it falls 
back to failing the worker. I don't think i've seen more than ~5-10 connectors, 
and ~20-30 Transforms packaged together. If the errors are non-consecutive then 
the counter resets as well, so the actual number of tolerated hasNext calls 
could be quite large, covering most use-cases.
   
   The heuristic is just there to prevent infinite loops if the exception 
message happens to contain a memory address, or the exception alternates 
between two different messages in the ServiceLoader-not-making-progress case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict

2023-07-14 Thread via GitHub


philipnee commented on PR #13920:
URL: https://github.com/apache/kafka/pull/13920#issuecomment-1636359925

   @flashmouse thansk for the PR - I was the one who wrote the if-else if -else 
block.  I'm a bit surprised that I actually made this mistake, so I held it off 
for a few days just to make sure.  As you have already provided a case that 
breaks that existing logic, could you implement this as part of the test? Also, 
could you provide a test case that fails `if (consumerPartitionCount < 
otherConsumerPartitionCount) {` but succeeds in `if (consumerPartitionCount + 1 
< otherConsumerPartitionCount) {` 
   
   This would help massively for the reviews and documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264075402


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   I looked more into how the Reflections library handles this, and it actually 
just WARN logs these classes and never shows them to us, so we don't even get 
the opportunity to log the error ourselves:
   ```
   [2023-07-14 11:54:26,418] WARN could not get type for name 
test.plugins.MissingSuperclassConverter from any class loader 
(org.reflections.Reflections:318)
   org.reflections.ReflectionsException: could not get type for name 
test.plugins.MissingSuperclassConverter
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at 
org.reflections.ReflectionUtils.lambda$forNames$22(ReflectionUtils.java:330)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:332)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.getPluginDesc(ReflectionScanner.java:118)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.scanPlugins(ReflectionScanner.java:91)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:78)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:66)
   Caused by: java.lang.NoClassDefFoundError: test/plugins/NonExis

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-14 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264075041


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -376,19 +448,28 @@ public 
CoordinatorResult consumerGro
 return result;
 }
 
-public List> sleep(long 
ms) {
+public List> sleep(long ms) {
 time.sleep(ms);
-List> timeouts = 
timer.poll();
-timeouts.forEach(timeout -> timeout.records.forEach(this::replay));
+List> timeouts = timer.poll();
+timeouts.forEach(timeout -> {
+if (timeout.result.replayRecords()) {
+timeout.result.records().forEach(this::replay);
+}
+});
 return timeouts;
 }
 
-public MockCoordinatorTimer.ScheduledTimeout 
assertSessionTimeout(
+public void sleepAndAssertEmptyResult(long ms) {
+List> timeouts = sleep(ms);
+timeouts.forEach(timeout -> assertEquals(EMPTY_RESULT, 
timeout.result));
+}

Review Comment:
   will keep it as assertEmptyResult as the timeout is not empty (can be) but 
we want to assert that the coordinator result is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp opened a new pull request, #14019: MINOR: optimization of equals and hashcode of GlobalVisitor class

2023-07-14 Thread via GitHub


bmscomp opened a new pull request, #14019:
URL: https://github.com/apache/kafka/pull/14019

   Optimize equals and hashCode methods for `GlobalVisitor` class 
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] bmscomp opened a new pull request, #14018: Optimization of equals methods on implementations of Commands.Handler in shell.command package

2023-07-14 Thread via GitHub


bmscomp opened a new pull request, #14018:
URL: https://github.com/apache/kafka/pull/14018

   Optimization of  equals  method on  multiple implementations  of 
`Commands.Handler ` in `shell.command` package ,for making it less verbose 
   
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263971924


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -740,22 +735,24 @@ public Long apply(List records) {
 // Start by trying to apply the record to our 
in-memory state. This should always
 // succeed; if it does not, that's a fatal error. 
It is important to do this before
 // scheduling the record for Raft replication.
-int i = 1;
+int i = 0;

Review Comment:
   it's just an index. I can call it `recordIndex`.
   
   I will revise the error mesage a bit too. The clunky wording from before 
reflected the fact that we didn't really know the offset.



##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -973,14 +970,14 @@ public void 
handleCommit(BatchReader reader) {
 log.debug("Replaying commits from the active 
node up to " +
 "offset {} and epoch {}.", offset, epoch);
 }
-int i = 1;
+int i = 0;

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


C0urante commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1263968763


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   Of the approaches you've laid out, I'm -1 on 1 (the coverage provided by 
these cases is important, especially if it leads to discoveries like this one) 
and 2 (if a user runs into this it's going to be a massive headache to debug 
since they'll see plugin loading errors for more than just the broken plugin).
   
   I agree that it's a bit early to go about implementing our own service 
loading logic, so although 3 is reasonable in theory, we can hold off on it for 
now.
   
   I don't love 4, since it's still a step backwards and killing workers on 
startup is quite extreme. I think I could live with it if we believe there are 
no better alternatives.
   
   I did some digging and it seems like this is due to a known bug in the 
OpenJDK `ServiceLoader`: https://bugs.openjdk.org/browse/JDK-8196182 (unclear 
if this also affects other JDK distributions).
   
   Given that we're dealing with buggy behavior in a popular JDK distribution, 
it seems reasonable to try deal with this scenario gracefully. I prototyped a 
local workaround for this that's a little hacky but does cause our tests to 
pass. We can wrap calls to `Iterator::hasNext` in some error-handling logic:
   
   ```java
   private boolean serviceLoaderHasNext(Iterator serviceLoaderIterator) {
   try {
   return serviceLoaderIterator.hasNext();
   } catch (LinkageError e1) {
   log.error("Failed to scan for next service loaded plugin", e1);
   try {
   return serviceLoaderIterator.hasNext();
   } catch (LinkageError e2) {
   // It's difficult to know for sure if the iterator was able to 
advance past the first broken
   // plugin class, or if it will continue to fail on that broken 
class for any subsequent calls
   // to Iterator::hasNext or Iterator::next
   // As a best-effort measure, we compare exception messages with 
the assumption that
   // they will include the na

[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263966038


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -458,38 +459,32 @@ private void handleEventEnd(String name, long 
startProcessingTimeNs) {
 
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
 }
 
-private Throwable handleEventException(String name,
-   OptionalLong startProcessingTimeNs,
-   Throwable exception) {
-Throwable externalException =
-ControllerExceptions.toExternalException(exception, () -> 
latestController());
-if (!startProcessingTimeNs.isPresent()) {
-log.error("{}: unable to start processing because of {}. Reason: 
{}", name,
-exception.getClass().getSimpleName(), exception.getMessage());
-return externalException;
-}
-long endProcessingTime = time.nanoseconds();
-long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
-long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-if (ControllerExceptions.isExpected(exception)) {
-log.info("{}: failed with {} in {} us. Reason: {}", name,
-exception.getClass().getSimpleName(), deltaUs, 
exception.getMessage());
-return externalException;
+private Throwable handleEventException(
+String name,
+OptionalLong startProcessingTimeNs,
+Throwable exception
+) {
+OptionalLong deltaUs;
+if (startProcessingTimeNs.isPresent()) {
+long endProcessingTime = time.nanoseconds();
+long deltaNs = endProcessingTime - 
startProcessingTimeNs.getAsLong();
+deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, 
NANOSECONDS));
+} else {
+deltaUs = OptionalLong.empty();
+}
+EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
+fromInternal(exception, () -> latestController());
+String failureMessage = info.failureMessage(lastCommittedEpoch, 
deltaUs,
+isActiveController(), lastCommittedOffset);
+if (info.isFault()) {
+nonFatalFaultHandler.handleFault(name + ": " + failureMessage, 
exception);
+} else {
+log.info("{}: {}", name, failureMessage);

Review Comment:
   In general if something isn't a fault, it shouldn't be logged at ERROR level.
   
   For example, if someone tries to create a topic but one already exists with 
that name, that should not cause ERROR messages in the controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263963588


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.
+ */
+private final boolean isFault;
+
+/**
+ * True if this exception should cause a controller failover.
+ * All faults cause failover
+ */
+private final boolean causesFailover;
+
+/**
+ * The internal exception.
+ */
+private final Throwable internalException;
+
+/**
+ * The exception to present to RPC callers, or Optional.empty if the 
internal exception should
+ * be presented directly.
+ */
+private final Optional externalException;
+
+/**
+ * Create an EventHandlerExceptionInfo object from an internal exception.
+ *
+ * @param internal  The internal exception.
+ * @param latestControllerSupplier  A function we can call to obtain the 
latest leader id.
+ *
+ * @return  The new immutable info object.
+ */
+public static EventHandlerExceptionInfo fromInternal(
+Throwable internal,
+Supplier latestControllerSupplier
+) {
+if (internal instanceof ApiException) {
+// This exception is a standard API error response from the 
controller, which can pass
+// through without modification.
+return new EventHandlerExceptionInfo(false, false, internal);
+} else if (internal instanceof NotLeaderException) {
+// The controller has lost leadership.
+return new EventHandlerExceptionInfo(false, true, internal,
+
ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+} else if (internal instanceof RejectedExecutionException) {
+// The controller event queue is shutting down.
+return new EventHandlerExceptionInfo(false, false, internal,
+new TimeoutException("The controller is shutting down.", 
internal));
+} else if (internal instanceof BoundedListTooLongException) {
+// The operation could not be performed because it would have 
created an overly large
+// batch.
+return new EventHandlerExceptionInfo(false, false, internal,
+new PolicyViolationException("Unable to perform excessively 
large batch " +
+"operation."));
+} else if (internal instanceof UnexpectedEndOffsetException) {
+// The active controller picked the wrong end offset for its next 
batch. It must now
+// fail over. This should be pretty rare.
+return new EventHandlerExceptionInfo(false, true, internal,
+new NotControllerException("Unexpected end offset. Controller 
not known."));
+} else if (internal instanceof InterruptedException) {
+// The controller event queue has been interrupted. This normally 
only happens during
+// a JUnit test that has hung. The test framework sometimes sends 
an InterruptException
+// to all threads to try to get them to shut down. This isn't the 
correct way to shut
+// the test, but it may happen if someth

[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263962510


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263961378


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.
+ */
+private final boolean isFault;
+
+/**
+ * True if this exception should cause a controller failover.
+ * All faults cause failover
+ */
+private final boolean causesFailover;
+
+/**
+ * The internal exception.
+ */
+private final Throwable internalException;
+
+/**
+ * The exception to present to RPC callers, or Optional.empty if the 
internal exception should
+ * be presented directly.
+ */
+private final Optional externalException;
+
+/**
+ * Create an EventHandlerExceptionInfo object from an internal exception.
+ *
+ * @param internal  The internal exception.
+ * @param latestControllerSupplier  A function we can call to obtain the 
latest leader id.
+ *
+ * @return  The new immutable info object.
+ */
+public static EventHandlerExceptionInfo fromInternal(
+Throwable internal,
+Supplier latestControllerSupplier
+) {
+if (internal instanceof ApiException) {
+// This exception is a standard API error response from the 
controller, which can pass
+// through without modification.
+return new EventHandlerExceptionInfo(false, false, internal);
+} else if (internal instanceof NotLeaderException) {
+// The controller has lost leadership.
+return new EventHandlerExceptionInfo(false, true, internal,
+
ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+} else if (internal instanceof RejectedExecutionException) {
+// The controller event queue is shutting down.
+return new EventHandlerExceptionInfo(false, false, internal,
+new TimeoutException("The controller is shutting down.", 
internal));
+} else if (internal instanceof BoundedListTooLongException) {
+// The operation could not be performed because it would have 
created an overly large
+// batch.
+return new EventHandlerExceptionInfo(false, false, internal,
+new PolicyViolationException("Unable to perform excessively 
large batch " +
+"operation."));
+} else if (internal instanceof UnexpectedEndOffsetException) {
+// The active controller picked the wrong end offset for its next 
batch. It must now
+// fail over. This should be pretty rare.
+return new EventHandlerExceptionInfo(false, true, internal,
+new NotControllerException("Unexpected end offset. Controller 
not known."));

Review Comment:
   Yes, "will resign" is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] DL1231 commented on a diff in pull request #13969: KAFKA-15154: Acquire lock when reading checkQuotas

2023-07-14 Thread via GitHub


DL1231 commented on code in PR #13969:
URL: https://github.com/apache/kafka/pull/13969#discussion_r1263954988


##
clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java:
##
@@ -366,4 +368,60 @@ public void 
testUpdatingMetricConfigIsReflectedInTheSensor() {
 
 metrics.close();
 }
+
+@Test
+public void testConcurrentReadWriteAccessForMetrics() throws 
ExecutionException, InterruptedException {
+final Metrics metrics = new Metrics(new 
MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+.timeWindow(1, TimeUnit.MILLISECONDS)
+.samples(100));
+final Sensor sensor = metrics.sensor("sensor");
+
+final int metricCount = 1;
+// Add a large number of metrics to increase the execution time of 
checkQuotas
+for (int i = 0; i < metricCount; i++) {
+sensor.add(metrics.metricName("test-metric" + i, "test-group" + 
i), new Rate());
+}
+sensor.record(10, 1, false);
+CountDownLatch latch = new CountDownLatch(1);
+ExecutorService executor = Executors.newFixedThreadPool(2);
+
+// Use non-thread-safe methods for concurrent read and write of 
metrics.
+Future worker = executor.submit(() -> {
+try {
+sensor.checkQuotasNonThreadSafe(3);
+return null;
+} catch (Throwable e) {
+return e;
+}
+});
+executor.submit(() -> {
+sensor.add(metrics.metricName("test-metric-non-thread-safe", 
"test-group-non-thread-safe"), new Rate());
+latch.countDown();
+});
+
+assertTrue(latch.await(5, TimeUnit.SECONDS), "If this failure happen 
frequently, we can try to increase the wait time");
+assertTrue(worker.isDone());
+assertNotNull(worker.get());
+assertEquals(ConcurrentModificationException.class, 
worker.get().getClass());

Review Comment:
   @divijvaidya thank you very much for your explanation. However, I still have 
a question. Both 'add' and 'checkQuotas' are synchronized, so how can they be 
accessed simultaneously? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup

2023-07-14 Thread via GitHub


mumrah commented on PR #13975:
URL: https://github.com/apache/kafka/pull/13975#issuecomment-1636131929

   @viktorsomogyi looking a bit more, how does this PR solve the issue in the 
JIRA? It seems like this PR will guard against topic creation happening on a 
controller without an initialized metadata cache. The JIRA refers to a connect 
worker getting an exception when updating metadata, which would be served by 
any broker. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets

2023-07-14 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-15182:
---
Description: 
See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader}} after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.

  was:
See discussion 
[here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]

 

TLDR: When users attempt to externally modify source connector offsets via the 
{{PATCH /offsets}} endpoint (introduced in 
[KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
 type mismatches can occur between offsets passed to 
{{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
connectors / tasks via an instance of {{OffsetStorageReader }}after the offsets 
have been modified. In order to prevent this type mismatch that could lead to 
subtle bugs in connectors, we could serialize + deserialize the offsets using 
the worker's internal JSON converter before invoking 
{{{}SourceConnector::alterOffsets{}}}.


> Normalize offsets before invoking SourceConnector::alterOffsets
> ---
>
> Key: KAFKA-15182
> URL: https://issues.apache.org/jira/browse/KAFKA-15182
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> See discussion 
> [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]
>  
> TLDR: When users attempt to externally modify source connector offsets via 
> the {{PATCH /offsets}} endpoint (introduced in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
>  type mismatches can occur between offsets passed to 
> {{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
> connectors / tasks via an instance of {{OffsetStorageReader}} after the 
> offsets have been modified. In order to prevent this type mismatch that could 
> lead to subtle bugs in connectors, we could serialize + deserialize the 
> offsets using the worker's internal JSON converter before invoking 
> {{{}SourceConnector::alterOffsets{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on pull request #13703: MINOR: Standardize controller log4j output for replaying records

2023-07-14 Thread via GitHub


jolshan commented on PR #13703:
URL: https://github.com/apache/kafka/pull/13703#issuecomment-1636124780

   Guys -- we really need to check builds before merging. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13947: KAFKA-15130: Delete remote segments when delete a topic

2023-07-14 Thread via GitHub


junrao commented on code in PR #13947:
URL: https://github.com/apache/kafka/pull/13947#discussion_r1263928100


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {
+if (!isLeader())

Review Comment:
   Hmm, after a topic is deleted, there is no leader.



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1159,6 +1161,9 @@ class LogManager(logDirs: Seq[File],
   checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
 }
 addLogToBeDeleted(removedLog)
+if (deleteRemote && removedLog.remoteLogEnabled())

Review Comment:
   It's still weird to reference remoteLogEnabled in LogManager since it only 
manages local data.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -526,14 +532,17 @@ class ReplicaManager(val config: KafkaConfig,
   /**
* Stop the given partitions.
*
-   * @param partitionsToStopA map from a topic partition to a boolean 
indicating
-   *whether the partition should be deleted.
+   * @param partitionsToStopA map from a topic partition to a 
boolean indicating
+   *whether the partition should be 
deleted.
+   * @param partitionsMaybeToDeleteRemote   A set of topic partitions that may 
need to delete
+   *remote segments.
*
-   * @returnA map from partitions to exceptions which 
occurred.
-   *If no errors occurred, the map will be empty.
+   * @returnA map from partitions to 
exceptions which occurred.
+   *If no errors occurred, the map 
will be empty.
*/
   protected def stopPartitions(
-partitionsToStop: Map[TopicPartition, Boolean]
+partitionsToStop: Map[TopicPartition, Boolean],

Review Comment:
   Will the KRaft support be added in 3.6.0?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -556,6 +562,46 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 }
 }
 
+public void cleanupDeletedRemoteLogSegments() {
+if (isCancelled())
+return;
+
+Uuid topicId = topicIdPartition.topicId();
+if (deletedTopicIds.contains(topicId)) {
+cleanupAllRemoteLogSegments();
+cancelRLMtask();
+deletedTopicIds.remove(topicId);
+}
+}
+
+private void cleanupAllRemoteLogSegments() {

Review Comment:
   This is the case that I am thinking about. Topic is being deleted and the 
controller marks the topic as deleted. The remote segment deletion is in 
progress but not completed. There is power outage and the whole cluster is 
down. After the cluster is restarted, since topic deletion won't be triggered 
again, the remaining remote segments for the deleted topic won't be cleaned. 
The local storage seems to have a similar issue right now since it's also 
deleted asynchronously. I am just wondering how we plan to address this new 
issue.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -500,11 +504,13 @@ class ReplicaManager(val config: KafkaConfig,
   // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
   // This could happen when topic is being deleted while broker is 
down and recovers.
   stoppedPartitions += topicPartition -> deletePartition
+  if (remoteLogManager.isDefined)
+partitionsMaybeToDeleteRemote += topicPartition

Review Comment:
   Hmm, `allPartitions` only stores the partitions with a replica in this 
broker. So, `HostedPartition.None` only means that the partition doesn't reside 
in this broker, but the partition could still exist in other brokers in the 
cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


cmccabe commented on PR #13643:
URL: https://github.com/apache/kafka/pull/13643#issuecomment-1636107554

   > Regarding the failure case where the end offset of a batch is not equal to 
what the controller expected, will this only happen if a raft election occurred 
that was not initiated by a resignation? What are the circumstances when this 
can happen? Raft timeouts?
   >
   > IIUC, when an election happens in the middle of an atomic batch, the batch 
will be lost anyways. The node will finish writing the batch to the local log 
at epoch N, then process the new leader at N+1, and then it will truncate its 
own log once it fetches from the new leader at N+1 and sees the start offset 
for the epoch is less than its own end offset. Is that about right?
   
   There's been some discussion of adding more Raft internal control records. 
One example is if we wanted to implement a dynamic change-of-quorum mechanism. 
There would probably be internal Raft records associated with that. It's not 
clear whether change-of-quorum events would also always involve a leader change 
-- I think in some cases they would not.
   
   Like I said earlier, if we end up adding more background raft messages, we 
might introduce some mechanism for the active controller to get an "offset 
lock" so it can get an offset, replay the records, and then try to commit them 
under that lock. That would minimize failovers caused by these background 
messages. But since they don't exist today, we can avoid that for today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-14 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1263925546


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1243,4 +1406,1283 @@ public static String 
consumerGroupSessionTimeoutKey(String groupId, String membe
 public static String consumerGroupRevocationTimeoutKey(String groupId, 
String memberId) {
 return "revocation-timeout-" + groupId + "-" + memberId;
 }
+
+ /** Replays GroupMetadataKey/Value to update the soft state of
+ * the generic group.
+ *
+ * @param key   A GroupMetadataKey key.
+ * @param value A GroupMetadataValue record.
+ */
+public void replay(
+GroupMetadataKey key,
+GroupMetadataValue value
+) {
+String groupId = key.group();
+
+if (value == null)  {
+// Tombstone. Group should be removed.
+groups.remove(groupId);
+} else {
+List loadedMembers = new ArrayList<>();
+for (GroupMetadataValue.MemberMetadata member : value.members()) {
+int rebalanceTimeout = member.rebalanceTimeout() == -1 ?
+member.sessionTimeout() : member.rebalanceTimeout();
+
+JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+supportedProtocols.add(new JoinGroupRequestProtocol()
+.setName(value.protocol())
+.setMetadata(member.subscription()));
+
+GenericGroupMember loadedMember = new GenericGroupMember(
+member.memberId(),
+Optional.ofNullable(member.groupInstanceId()),
+member.clientId(),
+member.clientHost(),
+rebalanceTimeout,
+member.sessionTimeout(),
+value.protocolType(),
+supportedProtocols,
+member.assignment()
+);
+
+loadedMembers.add(loadedMember);
+}
+
+String protocolType = value.protocolType();
+
+GenericGroup genericGroup = new GenericGroup(
+this.logContext,
+groupId,
+loadedMembers.isEmpty() ? EMPTY : STABLE,
+time,
+value.generation(),
+protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+Optional.ofNullable(value.protocol()),
+Optional.ofNullable(value.leader()),
+value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+);
+
+loadedMembers.forEach(member -> {
+genericGroup.add(member, null);
+log.info("Loaded member {} in group {} with generation {}.",
+member.memberId(), groupId, genericGroup.generationId());
+});
+
+genericGroup.setSubscribedTopics(
+genericGroup.computeSubscribedTopics()
+);
+}
+}
+
+/**
+ * Handle a JoinGroupRequest.
+ *
+ * @param context The request context.
+ * @param request The actual JoinGroup request.
+ *
+ * @return The result that contains records to append if the join group 
phase completes.
+ */
+public CoordinatorResult genericGroupJoin(
+RequestContext context,
+JoinGroupRequestData request,
+CompletableFuture responseFuture
+) {
+CoordinatorResult result = EMPTY_RESULT;
+
+String groupId = request.groupId();
+String memberId = request.memberId();
+int sessionTimeoutMs = request.sessionTimeoutMs();
+
+if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+);
+} else {
+boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+// Group is created if it does not exist and the member id is 
UNKNOWN. if member
+// is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+GenericGroup group;
+boolean isNewGroup = !groups.containsKey(groupId);
+try {
+group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+} catch (Throwable t) {
+responseFuture.complete(new JoinGroupResponseData()
+.setMemberId(memberId)
+.setErrorCode(Errors.forException(t).code())
+);
+return EMPTY_RESULT;
+}
+
+if (!acceptJoiningMember(group

[GitHub] [kafka] philipnee commented on pull request #13990: KAFKA-14937: Refactoring for client code to reduce boilerplate

2023-07-14 Thread via GitHub


philipnee commented on PR #13990:
URL: https://github.com/apache/kafka/pull/13990#issuecomment-1636098137

   Thanks @kirktrue - I think this makes the existing code much cleaner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13949: KAFKA-15141: init logger statically on hot codepaths

2023-07-14 Thread via GitHub


divijvaidya commented on PR #13949:
URL: https://github.com/apache/kafka/pull/13949#issuecomment-1636095586

   We are having compilation problems in latest CI run 
(https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13949/4/pipeline)
 which are fixed with https://github.com/apache/kafka/pull/14013 
   
   @gaurav-narula you will have to rebase with trunk to get the CI stable for 
this PR.
   
   ```
   > Task :metadata:compileTestJava FAILED
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13949/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:2577:
 error: constructor ReplicationControlTestContext in class 
ReplicationControlTestContext cannot be applied to given types;
   
   ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15182) Normalize offsets before invoking SourceConnector::alterOffsets

2023-07-14 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15182:
--
Affects Version/s: 3.6.0

> Normalize offsets before invoking SourceConnector::alterOffsets
> ---
>
> Key: KAFKA-15182
> URL: https://issues.apache.org/jira/browse/KAFKA-15182
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Yash Mayya
>Assignee: Yash Mayya
>Priority: Major
> Fix For: 3.6.0
>
>
> See discussion 
> [here|https://github.com/apache/kafka/pull/13945#discussion_r1260946148]
>  
> TLDR: When users attempt to externally modify source connector offsets via 
> the {{PATCH /offsets}} endpoint (introduced in 
> [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect]),
>  type mismatches can occur between offsets passed to 
> {{SourceConnector::alterOffsets}} and the offsets that are retrieved by 
> connectors / tasks via an instance of {{OffsetStorageReader }}after the 
> offsets have been modified. In order to prevent this type mismatch that could 
> lead to subtle bugs in connectors, we could serialize + deserialize the 
> offsets using the worker's internal JSON converter before invoking 
> {{{}SourceConnector::alterOffsets{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #14003: KAFKA-15182: Normalize source connector offsets before invoking SourceConnector::alterOffsets

2023-07-14 Thread via GitHub


C0urante merged PR #14003:
URL: https://github.com/apache/kafka/pull/14003


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13870: KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinator

2023-07-14 Thread via GitHub


jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1263905173


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -247,31 +324,78 @@ GroupMetadataManager build() {
  */
 private MetadataImage metadataImage;
 
+/**
+ * An empty result returned to the state machine. This means that
+ * there are no records to append to the log.
+ *
+ * Package private for testing.
+ */
+static final CoordinatorResult EMPTY_RESULT =
+new CoordinatorResult<>(Collections.emptyList(), 
CompletableFuture.completedFuture(null));

Review Comment:
   that forces the CoordinatorResult class to become non-generic which i don't 
think we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13913: KAFKA-15119:Support incremental syncTopicAcls in MirrorSourceConnector

2023-07-14 Thread via GitHub


C0urante commented on PR #13913:
URL: https://github.com/apache/kafka/pull/13913#issuecomment-1636074235

   @hudeqi Sorry, I think there's a misunderstanding here. I'm not claiming 
that MM2 would be incapable of detecting changes in source cluster ACLs with 
this change; I'm worried that it would be unable to detect (or really, just 
overwrite) changes in target cluster ACLs, if they were made by a separate 
user/process from the MM2 cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #14012: KAFKA-15181: fix(storage): wait for consumer to be synced after assigning partitions

2023-07-14 Thread via GitHub


jeqo commented on PR #14012:
URL: https://github.com/apache/kafka/pull/14012#issuecomment-1636071026

   @showuon thanks for your feedback! I have applied most suggestions -- let me 
know if I'm missing anything.
   
   About the test, I followed @mdedetrich advise on piggybacking on existing 
testing of `waitTillConsumptionCatchesUp` and actually removing the waiting 
from `TopicBasedRemoteLogMetadataManagerTest` to prove the new behavior when 
assigning partitions. 
   
   I tried to add some scenarios to 
`TopicBasedRemoteLogMetadataManagerRestartTest` but the current threads 
initialization haven't been able to reproduce. Maybe something to improve in a 
follow up PR, I think this one may be related: 
https://github.com/apache/kafka/pull/13689


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15191) Add support for Micrometer Observation

2023-07-14 Thread Marcin Grzejszczak (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15191?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marcin Grzejszczak updated KAFKA-15191:
---
Description: 
I'm a co-maintainer of Spring Cloud Sleuth and Micrometer projects (together 
with Tommy Ludwig and Jonatan Ivanov).

The idea of [Micrometer Observation|https://micrometer.io/docs/observation] is 
that you instrument code once but you get multiple benefits out of it - e.g. 
you can get tracing, metrics, logging or whatever you see fit.

I was curious if there's interest in adding Micrometer Observation support so 
that automatically metrics, spans could be created and tracing context 
propagation could happen too. In other words metrics and tracing of this 
project could be created + if there are Micrometer Observation compatible 
projects, then they will join the whole graph (e.g. Spring Framework 6, Apache 
Dubbo, Resilience4j, Apache Camel etc.).

If there's interest in adding that feature, I can provide a PR.

Regardless of whether there's interest in adding this directly to Kafka I would 
like to discuss what would be the best way to add instrumentation to Kafka. 
Adding instrumentation means before the message is sent to Kafka I would like 
to access its headers and be able mutate them, and before the message is 
received from Kafka I would like to access the headers and retrieve its 
key-values to create e.g. a span.


  was:
I'm a co-maintainer of Spring Cloud Sleuth and Micrometer projects (together 
with Tommy Ludwig and Jonatan Ivanov).

The idea of [Micrometer Observation|https://micrometer.io/docs/observation] is 
that you instrument code once but you get multiple benefits out of it - e.g. 
you can get tracing, metrics, logging or whatever you see fit).

I was curious if there's interest in adding Micrometer Observation support so 
that automatically metrics, spans could be created and tracing context 
propagation could happen too. In other words metrics and tracing of this 
project could be created + if there are Micrometer Observation compatible 
projects, then they will join the whole graph (e.g. Spring Framework 6, Apache 
Dubbo, Resilience4j, Apache Camel etc.).

If there's interest in adding that feature, I can provide a PR.

Regardless of whether there's interest in adding this directly to Kafka I would 
like to discuss what would be the best way to add instrumentation to Kafka. 
Adding instrumentation means before the message is sent to Kafka I would like 
to access its headers and be able mutate them, and before the message is 
received from Kafka I would like to access the headers and retrieve its 
key-values to create e.g. a span.



> Add support for Micrometer Observation
> --
>
> Key: KAFKA-15191
> URL: https://issues.apache.org/jira/browse/KAFKA-15191
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Marcin Grzejszczak
>Priority: Major
>
> I'm a co-maintainer of Spring Cloud Sleuth and Micrometer projects (together 
> with Tommy Ludwig and Jonatan Ivanov).
> The idea of [Micrometer Observation|https://micrometer.io/docs/observation] 
> is that you instrument code once but you get multiple benefits out of it - 
> e.g. you can get tracing, metrics, logging or whatever you see fit.
> I was curious if there's interest in adding Micrometer Observation support so 
> that automatically metrics, spans could be created and tracing context 
> propagation could happen too. In other words metrics and tracing of this 
> project could be created + if there are Micrometer Observation compatible 
> projects, then they will join the whole graph (e.g. Spring Framework 6, 
> Apache Dubbo, Resilience4j, Apache Camel etc.).
> If there's interest in adding that feature, I can provide a PR.
> Regardless of whether there's interest in adding this directly to Kafka I 
> would like to discuss what would be the best way to add instrumentation to 
> Kafka. Adding instrumentation means before the message is sent to Kafka I 
> would like to access its headers and be able mutate them, and before the 
> message is received from Kafka I would like to access the headers and 
> retrieve its key-values to create e.g. a span.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15191) Add support for Micrometer Observation

2023-07-14 Thread Marcin Grzejszczak (Jira)
Marcin Grzejszczak created KAFKA-15191:
--

 Summary: Add support for Micrometer Observation
 Key: KAFKA-15191
 URL: https://issues.apache.org/jira/browse/KAFKA-15191
 Project: Kafka
  Issue Type: New Feature
Reporter: Marcin Grzejszczak


I'm a co-maintainer of Spring Cloud Sleuth and Micrometer projects (together 
with Tommy Ludwig and Jonatan Ivanov).

The idea of [Micrometer Observation|https://micrometer.io/docs/observation] is 
that you instrument code once but you get multiple benefits out of it - e.g. 
you can get tracing, metrics, logging or whatever you see fit).

I was curious if there's interest in adding Micrometer Observation support so 
that automatically metrics, spans could be created and tracing context 
propagation could happen too. In other words metrics and tracing of this 
project could be created + if there are Micrometer Observation compatible 
projects, then they will join the whole graph (e.g. Spring Framework 6, Apache 
Dubbo, Resilience4j, Apache Camel etc.).

If there's interest in adding that feature, I can provide a PR.

Regardless of whether there's interest in adding this directly to Kafka I would 
like to discuss what would be the best way to add instrumentation to Kafka. 
Adding instrumentation means before the message is sent to Kafka I would like 
to access its headers and be able mutate them, and before the message is 
received from Kafka I would like to access the headers and retrieve its 
key-values to create e.g. a span.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14462) New Group Coordinator State Machine

2023-07-14 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-14462.
-
Fix Version/s: 3.6.0
   Resolution: Fixed

> New Group Coordinator State Machine
> ---
>
> Key: KAFKA-14462
> URL: https://issues.apache.org/jira/browse/KAFKA-14462
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.6.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac merged pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-14 Thread via GitHub


dajac merged PR #13991:
URL: https://github.com/apache/kafka/pull/13991


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-14 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263866826


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/util/SystemTimerReaperTest.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+public class SystemTimerReaperTest {
+private static class FutureTimerTask extends TimerTask {
+CompletableFuture future = new CompletableFuture<>();
+
+FutureTimerTask(long delayMs) {
+super(delayMs);
+}
+
+@Override
+public void run() {
+// We use org.apache.kafka.common.errors.TimeoutException to 
differentiate
+// from java.util.concurrent.TimeoutException.
+future.completeExceptionally(new TimeoutException(
+String.format("Future failed to be completed before timeout of 
%sMs ms was reached", delayMs)));
+}
+}
+
+private  CompletableFuture add(Timer timer, long delayMs) {
+FutureTimerTask task = new FutureTimerTask<>(delayMs);
+timer.add(task);
+return task.future;
+}
+
+@Test
+public void testReaper() throws Exception {
+Timer timer = new SystemTimerReaper("reaper", new 
SystemTimer("timer"));
+try {
+CompletableFuture t1 = add(timer, 100L);
+CompletableFuture t2 = add(timer, 200L);
+CompletableFuture t3 = add(timer, 300L);
+TestUtils.assertFutureThrows(t1, TimeoutException.class);

Review Comment:
   `TestUtils.assertFutureThrows(t1, TimeoutException.class)`
   I interpreted this to mean that the timeout exception was already thrown. I 
thought this would only happen after the timeout of 100-300 ms. I guess this is 
not the case and I didn't understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13991: KAFKA-14462; [23/23] Wire GroupCoordinatorService in BrokerServer

2023-07-14 Thread via GitHub


jolshan commented on code in PR #13991:
URL: https://github.com/apache/kafka/pull/13991#discussion_r1263865677


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/util/SystemTimerReaper.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.util;
+
+import org.apache.kafka.server.util.ShutdownableThread;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
+
+/**
+ * SystemTimerReaper wraps a {@link Timer} and starts a reaper thread
+ * to expire the tasks in the {@link Timer}.
+ */
+public class SystemTimerReaper implements Timer {
+private static final long WORK_TIMEOUT_MS = 200L;

Review Comment:
   Thanks. I didn't mean to imply that so I'm sorry it came across that way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13643: MINOR: provide the exact offset to QuorumController.replay

2023-07-14 Thread via GitHub


mumrah commented on code in PR #13643:
URL: https://github.com/apache/kafka/pull/13643#discussion_r1263851139


##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.

Review Comment:
   Should we mention that this will increment the error metric?



##
metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java:
##
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+/**
+ * True if this exception should be treated as a fault.
+ */
+private final boolean isFault;
+
+/**
+ * True if this exception should cause a controller failover.
+ * All faults cause failover
+ */
+private final boolean causesFailover;
+
+/**
+ * The internal exception.
+ */
+private final Throwable internalException;
+
+/**
+ * The exception to present to RPC callers, or Optional.empty if the 
internal exception should
+ * be presented directly.
+ */
+private final Optional externalException;
+
+/**
+ * Create an EventHandlerExceptionInfo object from an internal exception.
+ *
+ * @param internal  The internal exception.
+ * @param latestControllerSupplier  A function we can call to obtain the 
latest leader id.
+ *
+ * @return  The new immutable info object.
+ */
+public static EventHandlerExceptionInfo fromInternal(
+Throwable internal,
+Supplier latestControllerSupplier
+) {
+if (internal instanceof ApiException) {
+// This exception is a standard API error response from the 
controller, which can pass
+ 

[GitHub] [kafka] divijvaidya merged pull request #14016: MINOR: Update licenses for classgraph & plexus-utils

2023-07-14 Thread via GitHub


divijvaidya merged PR #14016:
URL: https://github.com/apache/kafka/pull/14016


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] riedelmax commented on pull request #13941: KAFKA-15123: Add tests for ChunkedBytesStream

2023-07-14 Thread via GitHub


riedelmax commented on PR #13941:
URL: https://github.com/apache/kafka/pull/13941#issuecomment-1636006523

   Ups.. overlooked that. Thanks for pointing it out. I will fix it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13975: KAFKA-15161: Fix InvalidReplicationFactorException at connect startup

2023-07-14 Thread via GitHub


mumrah commented on code in PR #13975:
URL: https://github.com/apache/kafka/pull/13975#discussion_r1263821011


##
core/src/main/scala/kafka/server/MetadataCache.scala:
##
@@ -109,6 +109,8 @@ trait MetadataCache {
   def getRandomAliveBrokerId: Option[Int]
 
   def features(): Features
+
+  def isInitialized(): Boolean

Review Comment:
   Since KRaft brokers have the concept of fencing and metadata write requests 
are forwarded to the controller, this isn't really necessary. Can we make this 
method ZkMetadataCache-only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13941: KAFKA-15123: Add tests for ChunkedBytesStream

2023-07-14 Thread via GitHub


divijvaidya commented on PR #13941:
URL: https://github.com/apache/kafka/pull/13941#issuecomment-1635980015

   Hey @riedelmax 
   https://github.com/apache/kafka/pull/13941/files#r1258587583 comment is not 
resolved yet. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] riedelmax commented on pull request #13941: KAFKA-15123: Add tests for ChunkedBytesStream

2023-07-14 Thread via GitHub


riedelmax commented on PR #13941:
URL: https://github.com/apache/kafka/pull/13941#issuecomment-1635974772

   @divijvaidya Please have a look at my changes resolving your comments :)
   
   Also, there are test failures on the CI and also some test failures in my 
local environemnt. However, I can absolutely not see any connection to my 
changes. What can I do to resolve this?
   
   Do I have to rebase to upstream/trunk and force-push before this can be 
merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim opened a new pull request, #14017: KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator

2023-07-14 Thread via GitHub


jeffkbkim opened a new pull request, #14017:
URL: https://github.com/apache/kafka/pull/14017

   built on top of https://github.com/apache/kafka/pull/13870, this PR 
implements the sync group API
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #14016: MINOR: Update licenses for classgraph & plexus-utils

2023-07-14 Thread via GitHub


divijvaidya opened a new pull request, #14016:
URL: https://github.com/apache/kafka/pull/14016

   - ClassGraph is not used anymore and has also been removed from trunk.
   - Plexis was incorrectly set to version 3.3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13969: KAFKA-15154: Acquire lock when reading checkQuotas

2023-07-14 Thread via GitHub


divijvaidya commented on code in PR #13969:
URL: https://github.com/apache/kafka/pull/13969#discussion_r1263677741


##
clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java:
##
@@ -366,4 +368,60 @@ public void 
testUpdatingMetricConfigIsReflectedInTheSensor() {
 
 metrics.close();
 }
+
+@Test
+public void testConcurrentReadWriteAccessForMetrics() throws 
ExecutionException, InterruptedException {
+final Metrics metrics = new Metrics(new 
MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+.timeWindow(1, TimeUnit.MILLISECONDS)
+.samples(100));
+final Sensor sensor = metrics.sensor("sensor");
+
+final int metricCount = 1;
+// Add a large number of metrics to increase the execution time of 
checkQuotas
+for (int i = 0; i < metricCount; i++) {
+sensor.add(metrics.metricName("test-metric" + i, "test-group" + 
i), new Rate());
+}
+sensor.record(10, 1, false);
+CountDownLatch latch = new CountDownLatch(1);
+ExecutorService executor = Executors.newFixedThreadPool(2);
+
+// Use non-thread-safe methods for concurrent read and write of 
metrics.
+Future worker = executor.submit(() -> {
+try {
+sensor.checkQuotasNonThreadSafe(3);
+return null;
+} catch (Throwable e) {
+return e;
+}
+});
+executor.submit(() -> {
+sensor.add(metrics.metricName("test-metric-non-thread-safe", 
"test-group-non-thread-safe"), new Rate());
+latch.countDown();
+});
+
+assertTrue(latch.await(5, TimeUnit.SECONDS), "If this failure happen 
frequently, we can try to increase the wait time");
+assertTrue(worker.isDone());
+assertNotNull(worker.get());
+assertEquals(ConcurrentModificationException.class, 
worker.get().getClass());
+sensor.record(10, 1, false);
+
+// Use thread-safe methods for concurrent read and write of metrics.
+worker = executor.submit(() -> {
+try {
+sensor.checkQuotas(3);
+return null;
+} catch (Throwable e) {
+return e;
+}
+});
+executor.submit(() -> {
+sensor.add(metrics.metricName("test-metric-thread-safe", 
"test-group-thread-safe"), new Rate());
+});
+
+executor.shutdown();
+assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
+assertTrue(worker.isDone(), "If this failure happen frequently, we can 
try to increase the wait time");
+assertNull(worker.get(), "Sensor#checkQuotas SHOULD be thread-safe!");
+executor.shutdownNow();

Review Comment:
   in a try/finally please so that if an assertion fails, we still close the 
executor. Same for other closeable objects such as `Metrics`.



##
clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java:
##
@@ -366,4 +368,60 @@ public void 
testUpdatingMetricConfigIsReflectedInTheSensor() {
 
 metrics.close();
 }
+
+@Test
+public void testConcurrentReadWriteAccessForMetrics() throws 
ExecutionException, InterruptedException {
+final Metrics metrics = new Metrics(new 
MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE))
+.timeWindow(1, TimeUnit.MILLISECONDS)
+.samples(100));
+final Sensor sensor = metrics.sensor("sensor");
+
+final int metricCount = 1;
+// Add a large number of metrics to increase the execution time of 
checkQuotas
+for (int i = 0; i < metricCount; i++) {
+sensor.add(metrics.metricName("test-metric" + i, "test-group" + 
i), new Rate());
+}
+sensor.record(10, 1, false);
+CountDownLatch latch = new CountDownLatch(1);
+ExecutorService executor = Executors.newFixedThreadPool(2);
+
+// Use non-thread-safe methods for concurrent read and write of 
metrics.
+Future worker = executor.submit(() -> {
+try {
+sensor.checkQuotasNonThreadSafe(3);
+return null;
+} catch (Throwable e) {
+return e;
+}
+});
+executor.submit(() -> {
+sensor.add(metrics.metricName("test-metric-non-thread-safe", 
"test-group-non-thread-safe"), new Rate());
+latch.countDown();
+});
+
+assertTrue(latch.await(5, TimeUnit.SECONDS), "If this failure happen 
frequently, we can try to increase the wait time");
+assertTrue(worker.isDone());
+assertNotNull(worker.get());
+assertEquals(ConcurrentModificationException.class, 
worker.get().getClass());

Review Comment:
   @kamalcph has a valid point. We are relying on the assumption that iterating 
through the metrics takes a long time by setting metric count to 1000. In 
faster machines, this

[jira] [Commented] (KAFKA-13965) Document broker-side socket-server-metrics

2023-07-14 Thread Kiriakos Marantidis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743178#comment-17743178
 ] 

Kiriakos Marantidis commented on KAFKA-13965:
-

Hello [~djameson] , are you working on this issue? If not I would like to give 
it a try.

> Document broker-side socket-server-metrics
> --
>
> Key: KAFKA-13965
> URL: https://issues.apache.org/jira/browse/KAFKA-13965
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 3.2.0
>Reporter: James Cheng
>Priority: Major
>  Labels: newbie, newbie++
>
> There are a bunch of broker JMX metrics in the "socket-server-metrics" space 
> that are not documented on kafka.apache.org/documentation
>  
>  * {_}MBean{_}: 
> kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}}
>  ** From KIP-188: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks]
>  *  
> kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName}
>  ** From KIP-612: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]
> It would be helpful to get all the socket-server-metrics documented
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] chb2ab closed pull request #11799: KAFKA-13688: Fix a couple incorrect metrics in KafkaController

2023-07-14 Thread via GitHub


chb2ab closed pull request #11799: KAFKA-13688: Fix a couple incorrect metrics 
in KafkaController
URL: https://github.com/apache/kafka/pull/11799


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-14 Thread via GitHub


hudeqi commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1263593759


##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java:
##
@@ -70,4 +91,9 @@ public Thread newThread(Runnable r) {
 }
 
 }
+
+public void removeMetrics() {

Review Comment:
   Hi, I am planning to clean up all the leaked metrics in the current base 
code recently, so I will pay close attention to the "metric", Is it possible to 
follow this template [here](https://github.com/apache/kafka/pull/13962)? It is 
better to put the metric name in a collection to prevent others from forgetting 
to remove when adding a new metric. Otherwise, my work in metric cleaning this 
area may be endless. @abhijeetk88 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-14 Thread via GitHub


divijvaidya commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1263633383


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -894,6 +920,7 @@ public void close() {
 Utils.closeQuietly(indexCache, "RemoteIndexCache");
 
 rlmScheduledThreadPool.close();
+removeMetrics();

Review Comment:
   This should probably be done in a try/finally after the thread pool (in the 
next line) has been shutdown.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java:
##
@@ -41,6 +44,19 @@ public String logPrefix() {
 return "[" + Thread.currentThread().getName() + "]";
 }
 }.logger(RemoteStorageThreadPool.class);
+KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup(this.getClass());
+metricsGroup.newGauge(metricsNamePrefix.concat("TaskQueueSize"), new 
Gauge() {
+@Override
+public Integer value() {
+return RemoteStorageThreadPool.this.getQueue().size();

Review Comment:
   More than accuracy, I was concerned about the impact of having two different 
threads accessing a non-thread safe data structure. In some cases, it could 
leave the structure in an inconsistent state. However, unlike complex 
structures like Map, seems like this queue's implementation is simple and 
size() just does a `AtomicInteger.get`. 
   
   Hence, my concern is mitigated. Please consider this comment resolved.



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
 BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
 BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
 BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+BrokerTopicStats.RemoteCopyBytesPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteCopyBytesPerSec, "bytes"),

Review Comment:
   Please note that for some other metrics, we store aggregated topics stat 
using `allTopicsStats`. Are we intentionally not add remote* metrics to it?



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -589,6 +687,47 @@ void testIdempotentClose() throws IOException {
 inorder.verify(remoteLogMetadataManager, times(1)).close();
 }
 
+@Test
+public void testRemoveMetricsOnClose() {
+MockedConstruction mockMetricsGroupCtor = 
mockConstruction(KafkaMetricsGroup.class);
+try {
+RemoteLogManager remoteLogManager = new 
RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId,
+time, tp -> Optional.of(mockLog), brokerTopicStats) {
+public RemoteStorageManager createRemoteStorageManager() {
+return remoteStorageManager;
+}
+
+public RemoteLogMetadataManager 
createRemoteLogMetadataManager() {
+return remoteLogMetadataManager;
+}
+};
+// Close RemoteLogManager so that metrics are removed
+remoteLogManager.close();
+
+KafkaMetricsGroup mockRlmMetricsGroup = 
mockMetricsGroupCtor.constructed().get(0);
+KafkaMetricsGroup mockThreadPoolMetricsGroup = 
mockMetricsGroupCtor.constructed().get(1);
+
+List remoteLogManagerMetricNames = 
Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+List remoteStorageThreadPoolMetricNames = Arrays.asList(

Review Comment:
   When we add a metric to the RemoteLogManager or to the thread pool, we will 
have to update this test. Alternatively, may I suggest, the pattern we have 
used at other places, i.e. create a list of metrics in the class where metric 
group is present, and verify invocations in the test for all members of that 
class. With this pattern, you won't have to modify the test at all when adding 
new metrics.
   
   As an example, you can see  
https://github.com/apache/kafka/blob/b3ce2e54f40f2d1e287d8bfd196dc5dcdbd2046d/core/src/main/scala/kafka/log/LogCleaner.scala#L529
 
   
   Separately, I also like the pattern of metric decoupling introduced used by 
`QuorumControllerMetrics`. It neatly encapsulates all QuorumController metrics 
at one place and we can potentially do similar for RemoteLogManager. This is a 
suggestion and feel free to ignore this.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -951,6 +970,10 @@ public Thread newThread(Runnable r) {
 return threadPool;
 }
 
+public Double getIdlePercent() {
+return 1 - (double) scheduledThreadPool.getActiveCount() / 
(double) schedu

[GitHub] [kafka] yashmayya commented on a diff in pull request #13948: KAFKA-15091: Fix misleading Javadoc for SourceTask::commit

2023-07-14 Thread via GitHub


yashmayya commented on code in PR #13948:
URL: https://github.com/apache/kafka/pull/13948#discussion_r1263626035


##
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:
##
@@ -105,9 +105,11 @@ public void initialize(SourceTaskContext context) {
 public abstract List poll() throws InterruptedException;
 
 /**
- * 
- * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
- * method should block until the commit is complete.
+ * This method is invoked periodically when offsets are committed for this 
source task. Note that the offsets
+ * being committed won't necessarily correspond to the latest offsets 
returned by this source task via
+ * {@link #poll()}. When exactly-once support is disabled, offsets are 
committed periodically and asynchronously
+ * (i.e. on a separate thread from the one which calls {@link #poll()}). 
When exactly-once support is enabled,
+ * offsets are committed on transaction commits (also see {@link 
TransactionBoundary}).

Review Comment:
   > I don't love how we're outlining differences in behavior when exactly-once 
support is enabled/disabled; it adds to the cognitive load and may tempt 
connector developers to write connectors that are designed to work exclusively 
with one mode or the other.
   
   > Could it be enough to leave this bit out and rely on the "Note that the 
offsets being committed won't necessarily correspond to the latest offsets 
returned by this source task via poll" part?
   
   Yeah, sounds good 👍 
   
   It's always a delicate balance between providing enough information to be 
useful and providing so much that it's overwhelming / confusing 😅 
   
   > We can also refer people to SourceTask::commitRecord for fine-grained 
tracking of records (though there's also no guarantee that all records that 
have been ack'd in that method will have their offsets committed before a call 
to SourceTask::commit).
   
I've added this reference and avoided mentioning offsets commit guarantees. 
I guess we could explicitly mention the lack of guarantee but again it's a 
balancing act between providing information and introducing confusion...



##
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:
##
@@ -105,9 +105,11 @@ public void initialize(SourceTaskContext context) {
 public abstract List poll() throws InterruptedException;
 
 /**
- * 
- * Commit the offsets, up to the offsets that have been returned by {@link 
#poll()}. This
- * method should block until the commit is complete.
+ * This method is invoked periodically when offsets are committed for this 
source task. Note that the offsets
+ * being committed won't necessarily correspond to the latest offsets 
returned by this source task via
+ * {@link #poll()}. When exactly-once support is disabled, offsets are 
committed periodically and asynchronously
+ * (i.e. on a separate thread from the one which calls {@link #poll()}). 
When exactly-once support is enabled,
+ * offsets are committed on transaction commits (also see {@link 
TransactionBoundary}).

Review Comment:
   > I don't love how we're outlining differences in behavior when exactly-once 
support is enabled/disabled; it adds to the cognitive load and may tempt 
connector developers to write connectors that are designed to work exclusively 
with one mode or the other.
   
   > Could it be enough to leave this bit out and rely on the "Note that the 
offsets being committed won't necessarily correspond to the latest offsets 
returned by this source task via poll" part?
   
   Yeah, sounds good 👍 
   
   It's always a delicate balance between providing enough information to be 
useful and providing so much that it's overwhelming / confusing 😅 
   
   > We can also refer people to SourceTask::commitRecord for fine-grained 
tracking of records (though there's also no guarantee that all records that 
have been ack'd in that method will have their offsets committed before a call 
to SourceTask::commit).
   
I've added this reference and avoided mentioning offsets commit guarantees. 
I guess we could explicitly mention the lack of guarantee but again it's a 
balancing act between providing information and introducing confusion...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #14005: KAFKA-15177: Implement KIP-875 SourceConnector::alterOffset API in MirrorMaker 2 connectors

2023-07-14 Thread via GitHub


yashmayya commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1263619000


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java:
##
@@ -46,4 +51,49 @@ public void startClusters() throws Exception {
 super.startClusters();
 }
 
+@Override
+@Test
+public void testReplication() throws Exception {
+super.testReplication();
+
+// Augment the base replication test case with some extra testing of 
the offset management
+// API introduced in KIP-875
+// We do this only when exactly-once support is enabled in order to 
avoid having to worry about
+// zombie tasks producing duplicate records and/or writing stale 
offsets to the offsets topic
+
+String backupTopic1 = remoteTopicName("test-topic-1", 
PRIMARY_CLUSTER_ALIAS);
+String backupTopic2 = remoteTopicName("test-topic-2", 
PRIMARY_CLUSTER_ALIAS);
+
+// Explicitly move back to offset 0
+// Note that the connector treats the offset as the last-consumed 
offset,
+// so it will start reading the topic partition from offset 1 when it 
resumes
+alterMirrorMakerSourceConnectorOffsets(backup, n -> 0L, 
"test-topic-1");
+// Reset the offsets for test-topic-2
+resetSomeMirrorMakerSourceConnectorOffsets(backup, "test-topic-2");
+resumeMirrorMakerConnectors(backup, MirrorSourceConnector.class);
+
+int expectedRecordsTopic1 = NUM_RECORDS_PRODUCED + 
((NUM_RECORDS_PER_PARTITION - 1) * NUM_PARTITIONS);
+assertEquals(expectedRecordsTopic1, 
backup.kafka().consume(expectedRecordsTopic1, RECORD_TRANSFER_DURATION_MS, 
backupTopic1).count(),
+"Records were not re-replicated to backup cluster after 
altering offsets.");
+int expectedRecordsTopic2 = NUM_RECORDS_PER_PARTITION * 2;
+assertEquals(expectedRecordsTopic2, 
backup.kafka().consume(expectedRecordsTopic2, RECORD_TRANSFER_DURATION_MS, 
backupTopic2).count(),
+"New topic was not re-replicated to backup cluster after 
altering offsets.");
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+Class[] connectorsToReset = 
CONNECTOR_LIST.toArray(new Class[0]);
+// Resetting the offsets for the heartbeat and checkpoint connectors 
doesn't have any effect
+// on their behavior, but users may want to wipe offsets from them to 
prevent the offsets topic
+// from growing infinitely. So, we include them in the list of 
connectors to reset as a sanity check

Review Comment:
   > (As noted in the comment here) Users may want to tombstone offsets for 
these connectors to prevent the offsets topic from growing infinitely
   
   Since the set of source partitions is limited here, shouldn't log compaction 
be good enough?
   
   > We have no way of knowing if the set of offsets given to us is the total 
set or not, so we can't choose to only allow total resets instead of partial 
resets
   
   Yeah, that makes sense.
   
   > Since partial resets become possible, it seems reasonable to allow users 
to "undo" a partial reset by re-submitting an offset for a given partition
   
   If these offsets are never read back and actually used, why would users want 
to "undo" partial or complete resets?
   
   > Also, I think the UnsupportedOperationException case was more intended for 
when offsets are managed externally, not for when it doesn't seem to make sense 
to modify them (because, e.g., they're never read back by the connector or its 
tasks).
   >
   > So with all this in mind, I figured it'd be best to allow offsets to be 
modified by users, but only if they match the format of the offsets that the 
connector's tasks already emit. Thoughts?
   
   Hm, I still feel like if offset modification isn't really doing anything 
here it might be more intuitive to simply disallow it rather than informing 
users that the offsets have been modified successfully but with no actual 
side-effect. However, I don't think too many users are going to be attempting 
to modify the offsets for `MirrorCheckpointConnector` / 
`MirrorHeartbeatConnector` and expecting any change in their behavior so I'm 
not too fussed about this either way.
   
   > As an aside... I'm wondering if we should abandon all kinds of validation 
when the proposed partition/offset pair is a tombstone. Just in case the 
offsets topic somehow ends up with garbage in it, it'd be nice to allow users 
to clean it up via the REST API, and if there's no existing partition/offset 
pair in the offsets topic for the given partition already, then emitting a 
tombstone effectively becomes a no-op
   
   Hm this is an interesting one. I think if the offsets topic ends up with 
garbage in it through an external producer, users could always clean it up 
using the same external producer. If we don't let users write garbage using the 
offsets REST API,

[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13464: MINOR: Keep FileOffset opened only the needed time

2023-07-14 Thread via GitHub


vamossagar12 commented on code in PR #13464:
URL: https://github.com/apache/kafka/pull/13464#discussion_r1263605246


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java:
##
@@ -73,36 +73,40 @@ public synchronized void stop() {
 
 @SuppressWarnings("unchecked")
 private void load() {
+Object obj = null;
+
 try (SafeObjectInputStream is = new 
SafeObjectInputStream(Files.newInputStream(file.toPath( {
-Object obj = is.readObject();
-if (!(obj instanceof HashMap))
-throw new ConnectException("Expected HashMap but found " + 
obj.getClass());
-Map raw = (Map) obj;
-data = new HashMap<>();
-for (Map.Entry mapEntry : raw.entrySet()) {
-ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
-ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
-data.put(key, value);
-OffsetUtils.processPartitionKey(mapEntry.getKey(), 
mapEntry.getValue(), keyConverter, connectorPartitions);
-}
+obj = is.readObject();
 } catch (NoSuchFileException | EOFException e) {

Review Comment:
   Could there be a case that earlier `NoSuchFileException` and `EOFEception` 
were no ops, but now `load` could throw  `ConnectException` because obj would 
be null and it would proceed with the rest of the logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #13464: MINOR: Keep FileOffset opened only the needed time

2023-07-14 Thread via GitHub


vamossagar12 commented on code in PR #13464:
URL: https://github.com/apache/kafka/pull/13464#discussion_r1263605246


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java:
##
@@ -73,36 +73,40 @@ public synchronized void stop() {
 
 @SuppressWarnings("unchecked")
 private void load() {
+Object obj = null;
+
 try (SafeObjectInputStream is = new 
SafeObjectInputStream(Files.newInputStream(file.toPath( {
-Object obj = is.readObject();
-if (!(obj instanceof HashMap))
-throw new ConnectException("Expected HashMap but found " + 
obj.getClass());
-Map raw = (Map) obj;
-data = new HashMap<>();
-for (Map.Entry mapEntry : raw.entrySet()) {
-ByteBuffer key = (mapEntry.getKey() != null) ? 
ByteBuffer.wrap(mapEntry.getKey()) : null;
-ByteBuffer value = (mapEntry.getValue() != null) ? 
ByteBuffer.wrap(mapEntry.getValue()) : null;
-data.put(key, value);
-OffsetUtils.processPartitionKey(mapEntry.getKey(), 
mapEntry.getValue(), keyConverter, connectorPartitions);
-}
+obj = is.readObject();
 } catch (NoSuchFileException | EOFException e) {

Review Comment:
   Could there be a case that earlier `NoSuchFileException` and `EOFEception` 
were no ops, but now `load` could throw  `ConnectException` because obj would 
be null and it would proceed the rest of the logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #14015: KAFKA-15129;[8/N] Remove metrics in KafkaRequestHandlerPool when broker shutdown

2023-07-14 Thread via GitHub


hudeqi commented on PR #14015:
URL: https://github.com/apache/kafka/pull/14015#issuecomment-1635686388

   And this new one, please help to review when you have time. @divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #14012: KAFKA-15181: fix(storage): wait for consumer to be synced after assigning partitions

2023-07-14 Thread via GitHub


jeqo commented on code in PR #14012:
URL: https://github.com/apache/kafka/pull/14012#discussion_r1263576266


##
storage/src/test/resources/log4j.properties:
##
@@ -19,4 +19,4 @@ log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
 log4j.logger.org.apache.kafka.server.log.remote.storage=INFO
-log4j.logger.org.apache.kafka.server.log.remote.metadata.storage=INFO
+log4j.logger.org.apache.kafka.server.log.remote.metadata.storage=DEBUG

Review Comment:
   Enabled it because it was hard to troubleshoot test issues without it; also 
only enabled on tests. Though I'm ok reverting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi opened a new pull request, #14015: KAFKA-15129;[8/N] Remove metrics in KafkaRequestHandlerPool when broker shutdown

2023-07-14 Thread via GitHub


hudeqi opened a new pull request, #14015:
URL: https://github.com/apache/kafka/pull/14015

   This pr is used to remove the metrics in `KafkaRequestHandlerPool` when 
broker shutdown.
   This pr has passed the corresponding unit test, and it is part of 
[KAFKA-15129](https://issues.apache.org/jira/browse/KAFKA-15129).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #14012: KAFKA-15181: fix(storage): wait for consumer to be synced after assigning partitions

2023-07-14 Thread via GitHub


jeqo commented on code in PR #14012:
URL: https://github.com/apache/kafka/pull/14012#discussion_r1263570124


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -78,45 +88,42 @@ public void startConsumerThread() {
 /**
  * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
  *
- * @param recordMetadata record metadata to be checked for consumption.
  * @throws TimeoutException if this method execution did not complete with 
in the wait time configured with
  *  property {@code 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP}.
  */
-public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) 
throws TimeoutException {
-waitTillConsumptionCatchesUp(recordMetadata, 
rlmmConfig.consumeWaitMs());
+public void waitTillConsumptionCatchesUp(int partition, long offset) 
throws TimeoutException {
+waitTillConsumptionCatchesUp(partition, offset, 
rlmmConfig.consumeWaitMs());
 }
 
 /**
  * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
  *
- * @param recordMetadata record metadata to be checked for consumption.
- * @param timeoutMs  wait timeout in milli seconds
+ * @param timeoutMs wait timeout in milliseconds
  * @throws TimeoutException if this method execution did not complete with 
in the given {@code timeoutMs}.
  */
-public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
- long timeoutMs) throws 
TimeoutException {
-final int partition = recordMetadata.partition();
+public void waitTillConsumptionCatchesUp(int partition, long offset, long 
timeoutMs) throws TimeoutException {
 final long consumeCheckIntervalMs = 
Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
 
 // If the current assignment does not have the subscription for this 
partition then return immediately.
 if (!consumerTask.isPartitionAssigned(partition)) {
-throw new KafkaException("This consumer is not subscribed to the 
target partition " + partition + " on which message is produced.");
+throw new KafkaException("This consumer is not subscribed to the 
target partition " + partition + " assigned. " +
+"Partitions currently assigned: " + 
consumerTask.partitionsAssigned());

Review Comment:
   Sure, let me rephrase that to 
   ```
   throw new KafkaException("This consumer is not assigned to the 
target partition " + partition + ". " +
   "Partitions currently assigned: " + 
consumerTask.metadataPartitionsAssigned());
   
   ```
   as partitions are not technically subscribed, but manually assigned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #14000: [MINOR] Fixing comment with IncrementalCooperativeAssignor#handleLostAssignments

2023-07-14 Thread via GitHub


vamossagar12 commented on PR #14000:
URL: https://github.com/apache/kafka/pull/14000#issuecomment-1635636681

   Hey Chris, I tagged you for this minor PR since you have context around 
these changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15172) Allow exact mirroring of ACLs between clusters

2023-07-14 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17743080#comment-17743080
 ] 

Mickael Maison commented on KAFKA-15172:


Yes feel free to work on this item if you want. Thanks

> Allow exact mirroring of ACLs between clusters
> --
>
> Key: KAFKA-15172
> URL: https://issues.apache.org/jira/browse/KAFKA-15172
> Project: Kafka
>  Issue Type: Task
>  Components: mirrormaker
>Reporter: Mickael Maison
>Assignee: hudeqi
>Priority: Major
>  Labels: needs-kip
>
> When mirroring ACLs, MirrorMaker downgrades allow ALL ACLs to allow READ. The 
> rationale to is prevent other clients to produce to remote topics. 
> However in disaster recovery scenarios, where the target cluster is not used 
> and just a "hot standby", it would be preferable to have exactly the same 
> ACLs on both clusters to speed up failover.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15189) Do not initialize RemoteStorage related metrics when disabled at cluster

2023-07-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-15189:
-

Assignee: Luke Chen

> Do not initialize RemoteStorage related metrics when disabled at cluster
> 
>
> Key: KAFKA-15189
> URL: https://issues.apache.org/jira/browse/KAFKA-15189
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> context: https://github.com/apache/kafka/pull/13944#discussion_r1251099820



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #14012: KAFKA-15181: fix(storage): wait for consumer to be synced after assigning partitions

2023-07-14 Thread via GitHub


showuon commented on code in PR #14012:
URL: https://github.com/apache/kafka/pull/14012#discussion_r1263477975


##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -78,45 +88,42 @@ public void startConsumerThread() {
 /**
  * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
  *
- * @param recordMetadata record metadata to be checked for consumption.

Review Comment:
   Please remember to update the javadoc.



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -78,45 +88,42 @@ public void startConsumerThread() {
 /**
  * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
  *
- * @param recordMetadata record metadata to be checked for consumption.
  * @throws TimeoutException if this method execution did not complete with 
in the wait time configured with
  *  property {@code 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP}.
  */
-public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata) 
throws TimeoutException {
-waitTillConsumptionCatchesUp(recordMetadata, 
rlmmConfig.consumeWaitMs());
+public void waitTillConsumptionCatchesUp(int partition, long offset) 
throws TimeoutException {
+waitTillConsumptionCatchesUp(partition, offset, 
rlmmConfig.consumeWaitMs());
 }
 
 /**
  * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
  *
- * @param recordMetadata record metadata to be checked for consumption.
- * @param timeoutMs  wait timeout in milli seconds
+ * @param timeoutMs wait timeout in milliseconds
  * @throws TimeoutException if this method execution did not complete with 
in the given {@code timeoutMs}.

Review Comment:
   javadoc needs to be updated.



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -139,6 +147,33 @@ public void close() throws IOException {
 
 public void addAssignmentsForPartitions(Set partitions) {
 consumerTask.addAssignmentsForPartitions(partitions);
+waitTillMetaPartitionIsInSync();
+}
+
+/**
+ * Whenever there is a change on the meta partitions assgined to the 
TBRLMM,

Review Comment:
   Could we make this clear? ex:
   `Whenever there is a change on the remote log metadata topic partitions 
assgined to the topic based remote log metadata manager,`



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -139,6 +147,33 @@ public void close() throws IOException {
 
 public void addAssignmentsForPartitions(Set partitions) {
 consumerTask.addAssignmentsForPartitions(partitions);
+waitTillMetaPartitionIsInSync();
+}
+
+/**
+ * Whenever there is a change on the meta partitions assgined to the 
TBRLMM,
+ * wait for consumer (therefore the cache as well) to be in-sync before 
continuing
+ */
+private void waitTillMetaPartitionIsInSync() {
+try {
+final Set topicPartitions = 
consumerTask.partitionsAssigned();
+if (!topicPartitions.isEmpty()) {
+final Map listOffsetsRequest = 
topicPartitions.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
OffsetSpec.latest()));
+final Map latestOffsets = 
adminClient.listOffsets(listOffsetsRequest)
+.all()
+.get(rlmmConfig.consumeWaitMs(), 
TimeUnit.MILLISECONDS); // piggybacking on existing timeout
+
+for (TopicPartition tp : topicPartitions) {
+waitTillConsumptionCatchesUp(
+tp.partition(),
+latestOffsets.get(tp).offset() - 1 // as latest 
offset is the next latest written record
+);
+}
+}
+} catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+log.error("Error encountered while consuming from meta 
partitions", e);

Review Comment:
   Should we throw exception here?



##
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java:
##
@@ -78,45 +88,42 @@ public void startConsumerThread() {
 /**
  * Waits if necessary for the consumption to reach the offset of the given 
{@code recordMetadata}.
  *
- * @param recordMetadata record metadata to be checked for consumption.
  * @throws TimeoutException if this method execution did not complete with 
in the wait time configured with
  *  property {@code 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_CON

[jira] [Created] (KAFKA-15190) Allow configuring a streams process ID

2023-07-14 Thread Joe Wreschnig (Jira)
Joe Wreschnig created KAFKA-15190:
-

 Summary: Allow configuring a streams process ID
 Key: KAFKA-15190
 URL: https://issues.apache.org/jira/browse/KAFKA-15190
 Project: Kafka
  Issue Type: Wish
  Components: streams
Reporter: Joe Wreschnig


We run our Kafka Streams applications in containers with no persistent storage, 
and therefore the mitigation of persisting process ID the state directly in 
KAFKA-10716 does not help us avoid shuffling lots of tasks during restarts.

However, we do have a persistent container ID (from a Kubernetes StatefulSet). 
Would it be possible to expose a configuration option to let us set the streams 
process ID ourselves?

We are already using this ID as our group.instance.id - would it make sense to 
have the process ID be automatically derived from this (plus application/client 
IDs) if it's set? The two IDs seem to have overlapping goals of identifying 
"this consumer" across restarts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-14 Thread via GitHub


divijvaidya commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1263465710


##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -61,15 +74,20 @@ public void testRemoteLogReaderWithoutError() throws 
RemoteStorageException, IOE
 assertFalse(actualRemoteLogReadResult.error.isPresent());
 assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
 assertEquals(fetchDataInfo, 
actualRemoteLogReadResult.fetchDataInfo.get());
+
+// Verify metrics for remote reads are updated correctly
+assertEquals(1, 
brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
+assertEquals(100, 
brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
+assertEquals(0, 
brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
 }
 
 @Test
 public void testRemoteLogReaderWithError() throws RemoteStorageException, 
IOException {
-when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new 
OffsetOutOfRangeException("error"));
+when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new 
RuntimeException("error"));

Review Comment:
   ok 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13837: KAFKA-9564: Local Tiered Storage implementation for Remote Storage Manager

2023-07-14 Thread via GitHub


showuon commented on code in PR #13837:
URL: https://github.com/apache/kafka/pull/13837#discussion_r1263259011


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/Transferer.java:
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * The implementation of the transfer of the data of the canonical segment and 
index files to
+ * this storage. The only reason the "transferer" abstraction exists is to be 
able to simulate
+ * file copy errors and exercise the associated failure modes.

Review Comment:
   I didn't see the `simulate file copy errors` in this PR, which should be 
expected, right? Those will appear in the real test cases, right?



##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##
@@ -0,0 +1,578 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageListener.LocalTieredStorageListeners;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.String.format;
+import static java.nio.file.Files.newInputStream;
+import static java.nio.file.StandardOpenOption.READ;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_PARTITION;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_PRODUCER_SNAPSHOT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentFileset.RemoteLogSegmentFileType.LEADER_EPOCH_CHECKPOINT;
+import static 
org.apache.kafka.server.log.remote

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13944: KAFKA-14953: Add tiered storage related metrics

2023-07-14 Thread via GitHub


divijvaidya commented on code in PR #13944:
URL: https://github.com/apache/kafka/pull/13944#discussion_r1263464332


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -277,6 +277,11 @@ class BrokerTopicMetrics(name: Option[String]) {
 BrokerTopicStats.TotalFetchRequestsPerSec -> 
MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
 BrokerTopicStats.FetchMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
 BrokerTopicStats.ProduceMessageConversionsPerSec -> 
MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+BrokerTopicStats.RemoteBytesOutPerSec -> 
MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"),

Review Comment:
   I wasn't able to find the JIRA, so when ahead created on 
https://issues.apache.org/jira/browse/KAFKA-15189 . This JIRA is a blocker for 
3.6 launch otherwise it will leak to regression for customers not using the 
Remote Storage feature,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15189) Do not initialize RemoteStorage related metrics when disabled at cluster

2023-07-14 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15189:
-
Fix Version/s: 3.6.0

> Do not initialize RemoteStorage related metrics when disabled at cluster
> 
>
> Key: KAFKA-15189
> URL: https://issues.apache.org/jira/browse/KAFKA-15189
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: KIP-405
> Fix For: 3.6.0
>
>
> context: https://github.com/apache/kafka/pull/13944#discussion_r1251099820



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15189) Do not initialize RemoteStorage related metrics when disabled at cluster

2023-07-14 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15189:
-
Labels: KIP-405  (was: )

> Do not initialize RemoteStorage related metrics when disabled at cluster
> 
>
> Key: KAFKA-15189
> URL: https://issues.apache.org/jira/browse/KAFKA-15189
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Divij Vaidya
>Priority: Blocker
>  Labels: KIP-405
>
> context: https://github.com/apache/kafka/pull/13944#discussion_r1251099820



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15189) Do not initialize RemoteStorage related metrics when disabled at cluster

2023-07-14 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-15189:


 Summary: Do not initialize RemoteStorage related metrics when 
disabled at cluster
 Key: KAFKA-15189
 URL: https://issues.apache.org/jira/browse/KAFKA-15189
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.6.0
Reporter: Divij Vaidya


context: https://github.com/apache/kafka/pull/13944#discussion_r1251099820



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on pull request #13562: KAFKA-14581: Moving GetOffsetShell to tools

2023-07-14 Thread via GitHub


fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1635504373

   Hi @ruslankrivoshein, thanks for adding the warning message.
   
   Not sure if this is ready for another review, but I still see some issues.
   
   The checkstyle phase is still failing, I suggested a way to fix it in one of 
my previous comments.
   
   I don't see the stacktrace anymore in case of wrong options, but the error 
messages are not exactly the same as before. In general, we should match the 
old behavior when reporting errors.
   
   ```sh
   # before
   $ bin/kafka-get-offsets.sh --topic my-topic --time -3
   Error occurred: Missing required option(s) [bootstrap-server]
   
   # now
   $ bin/kafka-get-offsets.sh --topic my-topic --time -3
   Missing required option(s) [bootstrap-server]
   Option   Description 
  
   --   --- 
  
   --bootstrap-serverin the form 
HOST1:PORT1,HOST2:PORT2.
   --broker-list  instead; ignored if --bootstrap-  
  
  server is specified. The 
server(s)  
  to connect to in the form HOST1:  
  
  PORT1,HOST2:PORT2. 
   ...
   
   # before
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets 
--time -1 --exclude-internal-topics
   Error occurred: Could not match any topic-partitions with the specified 
filters
   
   # now
   $ bin/kafka-get-offsets.sh --broker-list :9092 --topic __consumer_offsets 
--time -1 --exclude-internal-topics
   Could not match any topic-partitions with the specified filters
   ```
   
   Finally, the `tests/kafkatest/tests/core/get_offset_shell_test.py` system 
test fails because you need to also update the GetOffsetShell package in 
`tests/kafkatest/services/kafka/kafka.py`. You can run this test like this:
   
   ```sh
   ./gradlew clean systemTestLibs
   TC_PATHS="tests/kafkatest/tests/core/get_offset_shell_test.py" bash 
tests/docker/run_tests.sh
   ```
   
   Hope it helps. 
   Let me know when you are ready for another round of review.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15180) Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-14 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-15180:
-
Fix Version/s: 3.6.0

> Generalize integration tests to change use of KafkaConsumer to Consumer
> ---
>
> Key: KAFKA-15180
> URL: https://issues.apache.org/jira/browse/KAFKA-15180
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-945
> Fix For: 3.6.0
>
>
> For the consumer threading refactor project, we're introducing a new 
> implementation of the {{Consumer}} interface. However, most of the instances 
> in the integration tests specifically use the concrete implementation 
> {{{}KafkaConsumer{}}}. This task is to generalize those uses where possible 
> to use the {{Consumer}} interface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya merged pull request #13997: KAFKA-15180: Generalize integration tests to change use of KafkaConsumer to Consumer

2023-07-14 Thread via GitHub


divijvaidya merged PR #13997:
URL: https://github.com/apache/kafka/pull/13997


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya merged pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-14 Thread via GitHub


divijvaidya merged PR #13956:
URL: https://github.com/apache/kafka/pull/13956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13956: MINOR: Remove thread leak from ConsumerBounceTest

2023-07-14 Thread via GitHub


divijvaidya commented on PR #13956:
URL: https://github.com/apache/kafka/pull/13956#issuecomment-1635486526

   Thank you for the review folks. I will merge the thread leak part now.
   
   @showuon I am not sure if `controller-event-thread` was leaked because of 
`daemon-bounce-broker` or not. But looking at the logs `Gradle Test Executor 
177` was working happily until the `testConsumptionWithBrokerFailures` failed 
and after that it started failing all tests that it is running. Hence, there 
are two mysteries. Who `leaked `controller-event-thread` and does it have a 
correlation with `testConsumptionWithBrokerFailures`?
   
   Let's merge this one in and observe if we get more such large amount of test 
failures or not. It will provide some more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] see-quick commented on pull request #14011: MINOR: Fix MiniKdc Java 17 issue in system tests

2023-07-14 Thread via GitHub


see-quick commented on PR #14011:
URL: https://github.com/apache/kafka/pull/14011#issuecomment-1635479753

   > Thanks @see-quick. Did you verify that the test passes with this change?
   
   I have verified tests, which use MiniKdc. With this change, the tests using 
Java 17 passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15181) Race condition on partition assigned to TopicBasedRemoteLogMetadataManager

2023-07-14 Thread Jorge Esteban Quilcate Otoya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Esteban Quilcate Otoya updated KAFKA-15181:
-
Parent: KAFKA-7739
Issue Type: Sub-task  (was: Bug)

> Race condition on partition assigned to TopicBasedRemoteLogMetadataManager 
> ---
>
> Key: KAFKA-15181
> URL: https://issues.apache.org/jira/browse/KAFKA-15181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Jorge Esteban Quilcate Otoya
>Priority: Major
>  Labels: tiered-storage
>
> TopicBasedRemoteLogMetadataManager (TBRLMM) uses a cache to be prepared 
> whever partitions are assigned.
> When partitions are assigned to the TBRLMM instance, a consumer is started to 
> keep the cache up to date.
> If the cache hasn't finalized to build, TBRLMM fails to return remote 
> metadata about partitions that are store on the backing topic. TBRLMM may not 
> recover from this failing state.
> A proposal to fix this issue would be wait after a partition is assigned for 
> the consumer to catch up. A similar logic is used at the moment when TBRLMM 
> writes to the topic, and uses send callback to wait for consumer to catch up. 
> This logic can be reused whever a partition is assigned, so when TBRLMM is 
> marked as initialized, cache is ready to serve requests.
> Reference: https://github.com/aiven/kafka/issues/33



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] see-quick commented on a diff in pull request #14011: MINOR: Fix MiniKdc Java 17 issue in system tests

2023-07-14 Thread via GitHub


see-quick commented on code in PR #14011:
URL: https://github.com/apache/kafka/pull/14011#discussion_r1263405326


##
tests/kafkatest/services/security/minikdc.py:
##
@@ -107,6 +107,12 @@ def start_node(self, node):
 cmd = "for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % 
core_libs_jar
 cmd += " for file in %s; do CLASSPATH=$CLASSPATH:$file; done;" % 
core_dependant_test_libs_jar
 cmd += " export CLASSPATH;"
+# exports sun.security.krb5 avoiding Java 17 error:
+# Exception in thread "main" java.lang.IllegalAccessException:
+# class kafka.security.minikdc.MiniKdc cannot access class 
sun.security.krb5.Config
+# (in module java.security.jgss) because module java.security.jgss 
does not export
+# sun.security.krb5 to unnamed module @24959ca4

Review Comment:
   Okay, I will change it :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mdedetrich commented on pull request #14012: KAFKA-15181: fix(storage): wait for consumer to be synced after assigning partitions

2023-07-14 Thread via GitHub


mdedetrich commented on PR #14012:
URL: https://github.com/apache/kafka/pull/14012#issuecomment-1635383122

   Pinging @satishd and @junrao since its tiered storage related.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org