Re: [PR] MINOR: Minor update patch versions [kafka]

2023-10-25 Thread via GitHub


jolshan commented on PR #14641:
URL: https://github.com/apache/kafka/pull/14641#issuecomment-1780451891

   @satishd or @showuon -- did we notice these versions causing issues for the 
system tests for the 3.6 release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-25 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1780387305

   Apparently, it is not very easy to validate if the server-side assignor is 
used with the local assignor option instantiating the config.  We should 
validate this when starting up the client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-25 Thread via GitHub


philipnee opened a new pull request, #14642:
URL: https://github.com/apache/kafka/pull/14642

   
   Protocol Name | Type | Default | Description
   -- | -- | -- | --
   group.protocol | enum | generic | A flag which indicates if the new protocol 
should be used or not. It could be: generic or consumer
   group.remote.assignor | string | null | The server side assignor to use. It 
cannot be used in conjunction with group.local.assignor. null means that the 
choice of the assignor is left to the group coordinator.
   group.local.assignors | list | empty | The list of client side (local) 
assignors as a list of full class names. It cannot be used in conjunction with 
group.remote.assignor.
   
   Three new configurations were added.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14419) Same message consumed again by the same stream task after partition is lost and reassigned

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779731#comment-17779731
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14419:


Hey, sorry for the long delay – I'm still trying to catch up my memory of this 
ticket and the related one, but after looking at it again with fresh eyes I 
think I figured out what's going on here. If I'm reading this situation 
correctly, it does seem like there is some less-than-ideal behavior that we 
might be able to improve. 

Based on your recent logs, I think the root cause here is basically the same as 
what I fixed in 
[https://github.com/apache/kafka/pull/12869|https://github.com/apache/kafka/pull/12869,],
 just to a lesser degree. The issue in that patch was that Streams would 
sometimes trigger a followup rebalance even while the current rebalance was 
still going on, which lead some members to drop out of the group upon hitting a 
REBALANCE_IN_PROGRESS error during the SyncGroup phase. The fix basically just 
made the StreamThread wait until the rebalance was over before triggering a 
followup.

This should have been sufficient, but I suppose it is still theoretically 
possible to run into the same issue. Taking a deeper look at the original 
issue, it would only arise because of how Streams uses a non-blocking poll 
which allows it to return to its main loop and continue processing in the 
background during a rebalance. A lot of things happen throughout the loop, but 
the relevant operations here are as such:
 # Check the rebalance "schedule" and trigger one if:
 ## it has been requested for a time equal to or less than the current time
 ## the consumer is not actively participating in a rebalance (ie sometime 
after a SyncGroup response is received but before sending a new JoinGroup 
request)
 # Poll for more records, during which time either or both of the following may 
occur:
 ## consumer enters a new rebalance by sending a JoinGroup request
 ## consumer participates in a rebalance by receiving the JoinGroup response 
and sending a SyncGroup request
 ## consumer completes an ongoing rebalance by receiving a SyncGroup response, 
after which it can commit offsets for revoked tasks and initialize new ones
 # Process more records, which might have been either:
 ## Newly-consumed during the last poll call, or
 ## Left over from a previous batch that could not be fully processed before 
needing to return to poll due to running out of time in the max.poll.interval

So here's what I'm imagining: let's say we have two consumer, A and B, with A 
being the group leader/assignor.
 # A new rebalance begins, and both threads send their JoinGroup requests 
before returning to process some records
 # A doesn't have many records left to process, so it quickly returns to the 
poll call in step 2 of the loop. However B is still processing a large backlog
 # A performs the assignment and determines that a followup rebalance is 
needed, so it sets the rebalance schedule to 
 # After the assignment, A sends it out in the SyncGroup request and exits the 
poll call
 # A does some processing (or not) before returning to the poll and receiving 
the SyncGroup response
 # A exits the poll again, and this time when it reaches step 1 of the loop, it 
is now able to trigger the new rebalance
 # After A has requested a new rebalance, it finally returns to the poll call 
one more time, and rejoins the group/sends a JoinGroup request to kick it off
 # This whole time, B has had a large backlog of records, or a very high 
max.poll.interval, or a long GC pause – you get the idea. It's stuck in step 3
 # B finally finishes processing and leaves step 3, returning to the poll call 
during which it sends a very late SyncGroup request.
 # When the SyncGroup response is eventually received, B gets the 
REBALANCE_IN_PROGRESS error and fails its rebalance since the generation is 
stale

The fundamental issue here is that B is theoretically able to spend up to the 
max.poll.interval between sending its SyncGroup request and returning to poll 
to process the SyncGroup response, but A might be able to process its SyncGroup 
response, process its records, and then trigger a new rebalance all in that 
timeframe. This could happen when the task assignment is heavily imbalanced, 
for example. 

I can see a few potential paths forward here, and a fourth option that is more 
of a temporary workaround for [~Carlstedt] if you're still encountering this. 
None of them are really a guarantee, but they would help. For the most 
comprehensive fix we might want to consider doing two or even all three of 
these:

Option 1: add a small delay to the Streams followup rebalance trigger to help 
the entire group finish the SyncGroup phase before beginning the next rebalance.

Option 2: set a shorter upper bound on the maximum time a StreamThread can 
spend processing records while there is an 

Re: [PR] KAFKA-15602: revert KAFKA-4852 [kafka]

2023-10-25 Thread via GitHub


mjsax commented on PR #14617:
URL: https://github.com/apache/kafka/pull/14617#issuecomment-1780343175

   @showuon Considering this for 3.5.2 bug-fix release. (Given that we wait for 
RocksDB testing to complete -- we are on it -- we could squeeze this one in, 
too?).
   
   Also \cc @guozhangwang who merge the original PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15602: revert KAFKA-4852 [kafka]

2023-10-25 Thread via GitHub


mjsax commented on PR #14617:
URL: https://github.com/apache/kafka/pull/14617#issuecomment-1780342224

   I would focusing on reverting -- if we want to update tests, we can do it in 
a separate PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15116.

Resolution: Not A Problem

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15116) Kafka Streams processing blocked during rebalance

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779725#comment-17779725
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15116:


I tend to agree with Matthias here – this is definitely not a bug in Kafka 
Streams, and the behavior you describe in the most recent message sounds like 
it is working as intended. You can certainly disable cooperative rebalancing by 
setting the UPDATE_FROM config to something 2.3 or below, but as [~mjsax] 
mentioned, we plan to remove support for eager rebalancing entirely in an 
upcoming release so this would only be a temporary fix at best.

I say "at best" because it sounds to me like even without cooperative 
rebalancing, you're relying on implementation details that are not part of the 
public contract. That's not an accusation, it just means you're putting 
yourself at risk of breaking changes to your application, like cooperative 
rebalancing, because the semantics around when to commit are considered 
internal to Kafka Streams and can change at any time.

That said, I do agree that it would be nice if Streams had better support for 
blocking operations or long-running RPCs. Unfortunately that's just not the 
case at the moment, so you kind of have to take what you can get, or else 
rewrite as a plain producer/consumer app. Honestly, while I'm a big believer 
that almost any kind of application that follows the consume-process-produce 
pattern can and should be represented as a Streams app, I think this might be 
one of the few exceptions where it really does make more sense to implement 
with the plain clients. It sounds like part of the application logic is already 
driven by an external consumer, which together with the fact that you have 
strict requirements around committing as well as a shared state with external 
calls, indicates to me that Streams may not be the right tool for the job. Sort 
of by definition Streams is supposed to abstract away all the client 
interactions and all the offset commit logic, in addition to being shardable 
such that everything you need to process a stream of records for a given 
partition is local to that shard. So it's hard to imagine how to fit your 
application logic into a Streams app in a completely "safe" way.

Again, this doesn't mean you can't/shouldn't try to stretch the boundaries of 
Kafka Streams, but I think it makes sense to close this ticket as a bug given 
that everything is working as intended. However I'd encourage you to consider 
filing a "New Feature" ticket for some specific functionality that would help 
with your use case. I believe there is already one for better blocking API/RPC 
support, but I think the ability to pause processing in Kafka Streams would be 
nice to have in general, and could be used to pause Streams during a rebalance 
to solve the issue you're facing.

Hope that all makes sense! If you do want to consider rewriting this with the 
plain clients, I'm happy to give some pointers. It sounds like your Streams 
application is very complex already so I have to wonder if it might be more 
simple to write up outside the framework of Kafka Streams

> Kafka Streams processing blocked during rebalance
> -
>
> Key: KAFKA-15116
> URL: https://issues.apache.org/jira/browse/KAFKA-15116
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.0
>Reporter: David Gammon
>Priority: Major
>
> We have a Kafka Streams application that simply takes a messages, processes 
> it and then produces an event out the other side. The complexity is that 
> there is a requirement that all events with the same partition key must be 
> committed before the next message  is processed.
> This works most of the time flawlessly but we have started to see problems 
> during deployments where the first message blocks the second message during a 
> rebalance because the first message isn’t committed before the second message 
> is processed. This ultimately results in transactions timing out and more 
> rebalancing.
> We’ve tried lots of configuration to get the behaviour we require with no 
> luck. We’ve now put in a temporary fix so that Kafka Streams works with our 
> framework but it feels like this might be a missing feature or potentially a 
> bug.
> +Example+
> Given:
>  * We have two messages (InA and InB).
>  * Both messages have the same partition key.
>  * A rebalance is in progress so streams is no longer able to commit.
> When:
>  # Message InA -> processor -> OutA (not committed)
>  # Message InB -> processor -> blocked because #1 has not been committed



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-10-25 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779721#comment-17779721
 ] 

Philip Nee commented on KAFKA-15562:


Hi [~grigorii] - I haven't had a chance to find a ticket for you, but I 
published a PR for this issue.  would you like to provide some feedback?

 

https://github.com/apache/kafka/pull/14639

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Both fetchOffsetRequest and commitOffsetRequest handlers don't have 
> sufficient logic to handle timeout exception.
>  
> CommitOffsetRequest handler also doesn't handle various of server error such 
> as coordinator not found. We need to handle:
> If Exception is non null:
>  - handle RetriableError that respects requestTimeoutMs
>  - handle NonRetriableError
>  
> If the response contains error, ensure to:
>  - mark coordinator unknown if needed
>  - retry if needed
>  - fail the request



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Fix NPE during fetchSnapshot [kafka]

2023-10-25 Thread via GitHub


hudeqi commented on code in PR #14615:
URL: https://github.com/apache/kafka/pull/14615#discussion_r1372501784


##
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##
@@ -595,7 +595,7 @@ public void deleteSnapshotsBefore(long offset) throws 
IOException {
 }
 
 public Optional fetchSnapshot(long offset) {

Review Comment:
   Can we add an annotation to explain what situation may cause null to be 
obtained?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-25 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1372498997


##
metadata/src/main/java/org/apache/kafka/metadata/properties/PropertiesUtils.java:
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.properties;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Properties;
+
+public final class PropertiesUtils {
+/**
+ * Writes a Java Properties object to a file.
+ *
+ * @param props The Properties object.
+ * @param path  The file to write to.
+ * @throws IOException
+ */
+public static void writePropertiesFile(
+Properties props,
+String path
+) throws IOException {
+File tempFile = new File(path + ".tmp");
+try (
+FileOutputStream fos = new FileOutputStream(tempFile, false);
+OutputStreamWriter osw = new OutputStreamWriter(fos, 
StandardCharsets.UTF_8);
+PrintWriter pw = new PrintWriter(osw)
+) {
+props.store(pw, "");
+fos.flush();
+fos.getFD().sync();

Review Comment:
   If the process crashes  and we lose the data in the page cache (from say a 
power failure) after the rename but before the data of the file is on disk, 
then on some filesystems this would result in an empty target file. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12550.

Resolution: Won't Fix

Closing this out since it's usefulness is preempted by the StateUpdaterThread 
and having moved restoration out of the main StreamThread

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12550:
---
Fix Version/s: (was: 4.0.0)

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-15463.

Resolution: Not A Problem

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779715#comment-17779715
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15463:


[~yevsh]  Well the intended pattern is to get the StateStore from the context 
during init and then save a reference to the store for your 
processor/transformer. That's why we pass the context in to #init but not to 
#process (also, while it's not a significant overhead, it's definitely more 
efficient to do the state store lookup only once, during init, and not on every 
single record that's processed)

That said, I think it's a fair question as to why this is causing an error, and 
why the error doesn't happen every time. The fact that it generally only 
appears when you have more than one partition/task per application instance 
makes me think there is some state that is somehow being shared between the 
different tasks. This would definitely be explained by returning the same 
transformer instance each time, but based on your latest update, you are 
definitely returning a new transformer each time and still seeing the issue, 
right?

Assuming so, I'm inclined to believe the issue is with Spring. I vaguely recall 
a similar problem being reported in the past, which was ultimately because a 
Spring object was unexpectedly/unknowingly acting as a static/singleton class. 
This was resulting in the context – of which there is supposed to be exactly 
one per task – being shared between different instances of the task/processor 
nodes.

I'm pretty sure that's what is going on here as well. I'm guessing that the 
MyService class is a Spring bean? If so, then it's effectively a singleton and 
will be shared by each of the individual Transformer instances in your 
application, meaning the different tasks will be overwriting each others 
context when invoking #init on this transformer. So the context you retrieve 
from the myServiceBean during #process may not be the same as the context you 
saved to it in #init, causing it to throw this error since only the context 
corresponding to the task that is currently being processed will have the 
currentNode set to a non-null value.

Even if you made the change I originally suggested but saved the StateStore 
reference by passing it to the MyService bean, it wouldn't work – it might not 
throw this error but you would potentially be reading and writing to the wrong 
copy of a state store for a given task, which is even worse. The only way to 
solve this is by removing the Spring bean entirely or at least refactoring it 
so that it doesn't hold any internal state and has to have the full application 
state for that task passed in to it every time – in other words you just need 
to make sure to keep all the objects used by a given transformer completely 
local to that instance. Here is my recommendation for how to implement your 
transformer class – hope this helps!


private final MYService myServiceBean;
private StateStore myStore;

@Overridepublic void init(ProcessorContext context) \{
    myStore = context.getStateStore(STORE_NAME);
}

@Overridepublic KeyValue transform(String key, MyItem myItem) \{
    myServiceBean.process(myItem, myStore);
}
Basically modify the MyService bean to accept the StateStore to operate on as a 
parameter to its #process method. And definitely keep the fix in which you 
return a new Transformer instance each time instead of reusing the same one.

Let me know if you have any questions! I'm going to close the ticket since I'm 
fairly confident in this solution having seen the same problem before, but 
definitely please do reopen it if you implement the suggested fix and still 
encounter an error. Good luck!

>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> 

Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372479451


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, clientSoftwareName: 
String,
+clientSoftwareVersion: String, clientSourceAddress: String, 
clientSourcePort: String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, clientSoftwareName, 
clientSoftwareVersion, clientSourceAddress, clientSourcePort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)

Review Comment:
   a cool thing you can do is pattern matching, you can match `x.split("=", 
2).map(x => x.trim)` with a pair `(k, v)` and direct the non-matching case 
`case _`
   
   This is what I meant
   ```
   pattern.split("=", 2).map(_.trim) match {
case (k, v) if isValidParam(k) && validRegexPattern(v) => patternMap + (k 
-> v)
case _ => throw new InvalidConfigurationException()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372479451


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, clientSoftwareName: 
String,
+clientSoftwareVersion: String, clientSourceAddress: String, 
clientSourcePort: String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, clientSoftwareName, 
clientSoftwareVersion, clientSourceAddress, clientSourcePort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)

Review Comment:
   a cool thing you can do is pattern matching, you can match `x.split("=", 
2).map(x => x.trim)` with a pair `(k, v)` and direct the non-matching case 
`case _`
   
   This is what I meant
   ```
   pattern.split("=", 2).map(_.trim) match {
case (k, v) if isValidParam(k) && validRegexPattern(v) => (k, v)
case _ => throw new InvalidConfigurationException()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15563: Provide informative error messages when Connect REST requests time out [kafka]

2023-10-25 Thread via GitHub


C0urante commented on PR #14562:
URL: https://github.com/apache/kafka/pull/14562#issuecomment-1780284540

   Okay, I've pushed a couple new commits that:
   - Introduce the notion of a completion time for a callback stage
   - Add granularity to the callback stages for the distributed herder's tick 
thread
   
   I know that this doesn't cover everything we discussed, but I did give the 
rest a try.
   
   I explored an approach where we defined broader tick thread stages 
(declaring them in `DistributedHerder:;tick` and not in methods it invokes, and 
following a similar approach for herder requests). This turned out to be 
infeasible because of the control flow during a rebalance, where the herder 
invokes `WorkerGroupMember::poll` or `WorkerGroupMember::ensureActive`, which 
in turn can end up invoking `DistributedHerder.RebalanceListener::onRevoked`, 
which in turn can perform operations that warrant a distinct tick stage from, 
e.g., "ensuring membership in the cluster".
   
   Instead, I've tried for an approach where the tick thread stages are defined 
as narrowly as possible, and only around operations that we can reasonably 
anticipate will block. This does slightly increase the odds of a stage being 
completed when a request times out, but since the information about that stage 
isn't lost anymore, the fallout from that scenario is limited.
   
   I also experimented with the `Supplier` approach to reduce the 
runtime complexity of stage tracking for herder requests, but found that this 
was more difficult to unit test. Instead of being able to track the set of all 
recorded stages for a callback, we would have to manually query the `Supplier` 
after each anticipated herder stage update, which is more work and can fail to 
collect some stages if not queried at the correct time (especially if it's too 
difficult to query at a specific point in time during a call to 
`DistributedHerder::tick`). Since we both agree that performance shouldn't be a 
concern here, I hope this is acceptable.
   
   I've also verified with three consecutive Jenkins runs that the new unit 
test should finally be flake-free.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372471968


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, clientSoftwareName: 
String,
+clientSoftwareVersion: String, clientSourceAddress: String, 
clientSourcePort: String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, clientSoftwareName, 
clientSoftwareVersion, clientSourceAddress, clientSourcePort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case _: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()

Review Comment:
   Do you need the init? If you don't, you can just use the primary class 
constructor:
   ```
   class ClientMetricsMetadata(clientInstanceId: String,
   clientId: String,
   clientSoftwareName: String,
   clientSoftwareVersion: String,
   clientSourceAddress: String,
   clientSourcePort: String) {
 private val attributesMap: mutable.Map[String, String] = mutable.Map(
   CLIENT_INSTANCE_ID -> clientInstanceId,
   CLIENT_ID -> clientId,
   CLIENT_SOFTWARE_NAME -> clientSoftwareName,
   CLIENT_SOFTWARE_VERSION -> clientSoftwareVersion,
   CLIENT_SOURCE_ADDRESS -> clientSourceAddress,
   CLIENT_SOURCE_PORT -> clientSourcePort
 )
   ```
   
   such that the apply method above can just be 
   
   ```
   def apply(...): ClientMetricsMetadata = {
   new ClientMetricsMetadata(...)
 }
   ```



-- 
This is an 

Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372471968


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, clientSoftwareName: 
String,
+clientSoftwareVersion: String, clientSourceAddress: String, 
clientSourcePort: String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, clientSoftwareName, 
clientSoftwareVersion, clientSourceAddress, clientSourcePort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case _: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()

Review Comment:
   Do you need the init? If you don't, you can just use the primary class 
constructor:
   ```
   private val attributesMap: mutable.Map[String, String] = mutable.Map(
   CLIENT_INSTANCE_ID -> clientInstanceId,
   CLIENT_ID -> clientId,
   CLIENT_SOFTWARE_NAME -> clientSoftwareName,
   CLIENT_SOFTWARE_VERSION -> clientSoftwareVersion,
   CLIENT_SOURCE_ADDRESS -> clientSourceAddress,
   CLIENT_SOURCE_PORT -> clientSourcePort
 )
   ```
   
   such that the apply method above can just be 
   
   ```
   def apply(...): ClientMetricsMetadata = {
   new ClientMetricsMetadata(...)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on PR #14621:
URL: https://github.com/apache/kafka/pull/14621#issuecomment-1780279268

   Hi @apoorvmittal10 Thanks for taking my suggestions.  I left a few more, 
mostly about the aesthetic aspect as scala is a pretty idiomatic language...
   
   Terribly sorry that I don't have a lot of knowledge around the server side, 
so I can't provide advice on detail implementation - but I think your PR is 
clean and I can almost fully understand the intention.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372471968


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, clientSoftwareName: 
String,
+clientSoftwareVersion: String, clientSourceAddress: String, 
clientSourcePort: String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, clientSoftwareName, 
clientSoftwareVersion, clientSourceAddress, clientSourcePort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case _: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()

Review Comment:
   Do you need the init? If you don't, you can just do this:
   ```
   private val attributesMap: mutable.Map[String, String] = mutable.Map(
   CLIENT_INSTANCE_ID -> clientInstanceId,
   CLIENT_ID -> clientId,
   CLIENT_SOFTWARE_NAME -> clientSoftwareName,
   CLIENT_SOFTWARE_VERSION -> clientSoftwareVersion,
   CLIENT_SOURCE_ADDRESS -> clientSourceAddress,
   CLIENT_SOURCE_PORT -> clientSourcePort
 )
   ```
   
   such that the apply method above can just be 
   
   ```
   def apply(...): ClientMetricsMetadata = {
   new ClientMetricsMetadata(...)
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372467166


##
core/src/test/scala/kafka/metrics/ClientMetricsTestUtils.scala:
##
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import ClientMetricsConfig.ClientMatchingParams.{CLIENT_SOFTWARE_NAME, 
CLIENT_SOFTWARE_VERSION}
+
+import java.util.Properties
+
+object ClientMetricsTestUtils {
+  val defaultPushIntervalMs: Int = 30 * 1000 // 30 seconds

Review Comment:
   Do you want to make these constants camel starting with a capital letter 
like what you did in `ClientMetrics`? In general in scala, `Constant names 
should be in upper camel case` per scala documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-10-25 Thread via GitHub


ocadaruma commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1372463913


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
 // we manually override the state offset here prior to taking the snapshot.
 producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-producerStateManager.takeSnapshot()
+// We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+// which could block subsequent produces in the meantime.
+// flush is done in the scheduler thread along with segment flushing below
+val maybeSnapshot = producerStateManager.takeSnapshot(false)
 updateHighWatermarkWithLogEndOffset()
 // Schedule an asynchronous flush of the old segment
-scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+scheduler.scheduleOnce("flush-log", () => {
+  maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, 
"producer-snapshot"))

Review Comment:
   Yeah, I also noticed that. I'll fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-25 Thread via GitHub


dongnuo123 commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1372463509


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -268,7 +269,9 @@ public 
CompletableFuture consumerGroupHeartb
 ConsumerGroupHeartbeatRequestData request

Review Comment:
   Not sure if I understand, should we specify in group coordinator interface 
or in group coordinator service? I made some change in the group coordinator 
interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372460806


##
core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * 
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * 
+ * {
+ * 
+ *  subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *  subscribedMetrics: List of metric prefixes
+ *  pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *  matchingPatternsList: List of client matching patterns, that are used 
by broker to match the client instance
+ * with the subscription.
+ * 
+ * }
+ * 
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * 
+ *  "metrics" value should be comma separated metrics list. A prefix match 
on the requested metrics
+ *  is performed in clients to determine subscribed metrics. An empty list 
means no metrics subscribed.
+ *  A list containing just an empty string means all metrics subscribed.
+ *  Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ *  "interval.ms" should be between 100 and 360 (1 hour). This is the 
interval at which the client
+ *  should push the metrics to the broker.
+ *
+ *  "match" is a comma separated list of client match patterns, in case if 
there is no matching
+ *  pattern specified then broker considers that as all match which means 
the associated metrics
+ *  applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *  which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ *
+ * 
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,

Review Comment:
   I think this is how you can write this:
   
   ```
   case class SubscriptionInfo(subscriptionId: String,
 subscribedMetrics: List[String],
 pushIntervalMs: Int,
 matchingPatternsList: List[String]) { 
   def clientMatchingPatterns: Map[String, String] = 
 ClientMetricsMetadata.parseMatchingPatterns(matchingPatternsList)
 }
   ```
   
   Does it accept an empty subscriptionId?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372457923


##
core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * 
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * 
+ * {
+ * 
+ *  subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *  subscribedMetrics: List of metric prefixes
+ *  pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *  matchingPatternsList: List of client matching patterns, that are used 
by broker to match the client instance
+ * with the subscription.
+ * 
+ * }
+ * 
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * 
+ *  "metrics" value should be comma separated metrics list. A prefix match 
on the requested metrics
+ *  is performed in clients to determine subscribed metrics. An empty list 
means no metrics subscribed.
+ *  A list containing just an empty string means all metrics subscribed.
+ *  Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ *  "interval.ms" should be between 100 and 360 (1 hour). This is the 
interval at which the client
+ *  should push the metrics to the broker.
+ *
+ *  "match" is a comma separated list of client match patterns, in case if 
there is no matching
+ *  pattern specified then broker considers that as all match which means 
the associated metrics
+ *  applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *  which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ *
+ * 
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,

Review Comment:
   I would recommend using scala `case class` here.  It provides you with the 
basic utils such as getter. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372452303


##
core/src/main/scala/kafka/server/ClientMetricsManager.scala:
##
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review Comment:
   Extra line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372451618


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port

Review Comment:
   ctx.clientAddress.getHostAddress, ctx.clientAddress.getHostAddress -> You 
are passing in two hostAddress, this perhaps is the TODO item?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Minor update patch versions [kafka]

2023-10-25 Thread via GitHub


jolshan opened a new pull request, #14641:
URL: https://github.com/apache/kafka/pull/14641

   We were missing the versions for these patch releases. The versions are in 
versions.py and Dockerfile.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-25 Thread via GitHub


jsancio commented on PR #14489:
URL: https://github.com/apache/kafka/pull/14489#issuecomment-1780207833

   @jolshan I started a new build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372414818


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   Could we update the javadoc for the new param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372414239


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Thanks for the explanation, Justine and Artem. This sounds good then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779695#comment-17779695
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13152:


Hey [~sagarrao] – I'm happy to try and help you get this into 3.7. As a 
disclaimer I do have a few other KIPs I've already promised to help land in 3.7 
so this would be 3rd on my list, but we still have a good amount of time and 
given the KIP itself is already accepted, I think we can make it.

Just give me a ping on the PR when it's ready for me to take a look. And just 
to refresh my memory, all the caching work – both config and metrics -- are 
already merged, so the only thing remaining is to add (and implement) the new 
input.buffer.max.bytes config. Does that sound right?

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14484: [1/N] Move PartitionMetadataFile to storage module [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14607:
URL: https://github.com/apache/kafka/pull/14607#discussion_r1372393891


##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+private final String location;
+private final BufferedReader reader;
+
+public PartitionMetadataReadBuffer(
+String location,
+BufferedReader reader
+) {
+this.location = location;
+this.reader = reader;
+}
+
+PartitionMetadata read() throws IOException {
+String line = null;
+Uuid metadataTopicId;
+
+try {
+line = reader.readLine();
+String[] versionArr = WHITE_SPACES_PATTERN.split(line);
+
+if (versionArr.length == 2) {
+int version = Integer.parseInt(versionArr[1]);
+if (version == PartitionMetadataFile.CURRENT_VERSION) {

Review Comment:
   This is an existing issue. In the future, we may add a new field and bump up 
the version, to make it possible to downgrade, it's probably better to relax 
this check a bit.



##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadata.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;

Review Comment:
   It seems that this should be in the 
`org.apache.kafka.storage.internals.checkpoint` package?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/PartitionMetadataReadBuffer.java:
##
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.Uuid;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+public class PartitionMetadataReadBuffer {
+private static final Pattern WHITE_SPACES_PATTERN = 
Pattern.compile(":\\s+");
+
+private final String location;
+private final BufferedReader reader;
+
+public PartitionMetadataReadBuffer(
+String location,
+BufferedReader reader
+) {
+this.location = location;
+this.reader = reader;
+}
+
+PartitionMetadata read() throws IOException {
+String line = null;
+Uuid 

Re: [PR] KIP-951: protocol changes [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1372394072


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -360,7 +360,9 @@ public short partitionRecordVersion() {
 }
 
 public short fetchRequestVersion() {
-if (this.isAtLeast(IBP_3_5_IV1)) {
+if (this.isAtLeast(IBP_3_7_IV0)) {

Review Comment:
   Does this work because all the new fields are tagged (so no real changes in 
handling)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: protocol changes [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1372393724


##
clients/src/main/resources/common/message/FetchRequest.json:
##
@@ -53,7 +53,9 @@
   //
   // Version 15 adds the ReplicaState which includes new field ReplicaEpoch 
and the ReplicaId. Also,
   // deprecate the old ReplicaId field and set its default value to -1. 
(KIP-903)
-  "validVersions": "0-15",
+  //
+  // Version 16 is the same as version 15.
+  "validVersions": "0-16",

Review Comment:
   we should mark these APIs as in development by specifying 
`"latestVersionUnstable":true`
   See https://github.com/apache/kafka/pull/14046 for an example.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: protocol changes [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1372392217


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -102,6 +104,15 @@
   "about": "The preferred read replica for the consumer to use on its 
next fetch request"},
 { "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}
   ]}
+]},
+{ "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", 
"taggedVersions": "16+", "tag": 0,

Review Comment:
   let's not reuse the tags as I mentioned in the other PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: protocol changes [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14627:
URL: https://github.com/apache/kafka/pull/14627#discussion_r1372391952


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -45,7 +45,9 @@
   // Version 14 is the same as version 13 but it also receives a new error 
called OffsetMovedToTieredStorageException (KIP-405)
   //
   // Version 15 is the same as version 14 (KIP-903).
-  "validVersions": "0-15",
+  //
+  // Version 16 adds the 'NodeEndpoints' field.

Review Comment:
   please include the kip as was done for the other versions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: protocol changes [kafka]

2023-10-25 Thread via GitHub


jolshan commented on PR #14627:
URL: https://github.com/apache/kafka/pull/14627#issuecomment-1780155389

   Hey there. If this is KAFKA-15661, then the title of the PR should be 
   `KAFKA-15661: KIP-951: Server side and protocol changes`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-25 Thread via GitHub


CalvinConfluent commented on code in PR #14593:
URL: https://github.com/apache/kafka/pull/14593#discussion_r1372384881


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -361,11 +410,16 @@ public Optional build() {
 
 maybeUpdateRecordElr(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
+// If ELR is enabled, the ISR is allowed to be empty.
+if (record.isr() == null && (!targetIsr.isEmpty() || 
eligibleLeaderReplicasEnabled) && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
 // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
+if (targetIsr.isEmpty()) {
+log.info("A partition will have an empty ISR. " + this);

Review Comment:
   Good point. log will be flushed with this message during a shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-25 Thread via GitHub


CalvinConfluent commented on code in PR #14593:
URL: https://github.com/apache/kafka/pull/14593#discussion_r1372384540


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -253,17 +269,30 @@ private ElectionResult electAnyLeader() {
 return new ElectionResult(NO_LEADER, false);
 }
 
+private boolean canElectLastKnownLeader() {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


kirktrue commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372381836


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in 
the {@link ConsumerRebalanceListener}
+ * interface. When consumer group partition assignment changes, these methods 
are invoked. This class wraps those
+ * callback calls with some logging, optional {@link Sensor} updates, etc.
+ */
+class ConsumerRebalanceListenerInvoker {
+
+private final Logger log;
+private final SubscriptionState subscriptions;
+private final Time time;
+private final ConsumerCoordinatorMetrics coordinatorMetrics;
+
+ConsumerRebalanceListenerInvoker(LogContext logContext,
+ SubscriptionState subscriptions,
+ Time time,
+ ConsumerCoordinatorMetrics 
coordinatorMetrics) {
+this.log = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.time = time;
+this.coordinatorMetrics = coordinatorMetrics;
+}
+
+Exception invokePartitionsAssigned(final SortedSet 
assignedPartitions) {
+log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions, ", "));
+
+Optional listener = 
subscriptions.rebalanceListener();
+
+if (listener.isPresent()) {
+try {
+final long startMs = time.milliseconds();
+listener.get().onPartitionsAssigned(assignedPartitions);
+
coordinatorMetrics.assignCallbackSensor.record(time.milliseconds() - startMs);
+} catch (WakeupException | InterruptException e) {
+throw e;
+} catch (Exception e) {
+log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
+listener.getClass().getName(), assignedPartitions, e);
+return e;
+}
+}
+
+return null;
+}
+
+Exception invokePartitionsRevoked(final SortedSet 
revokedPartitions) {
+log.info("Revoke previously assigned partitions {}", 
Utils.join(revokedPartitions, ", "));
+Set revokePausedPartitions = 
subscriptions.pausedPartitions();
+revokePausedPartitions.retainAll(revokedPartitions);
+if (!revokePausedPartitions.isEmpty())
+log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", Utils.join(revokePausedPartitions, ", "));
+
+Optional listener = 
subscriptions.rebalanceListener();
+
+if (listener.isPresent()) {
+try {
+final long startMs = time.milliseconds();
+listener.get().onPartitionsRevoked(revokedPartitions);
+
coordinatorMetrics.revokeCallbackSensor.record(time.milliseconds() - startMs);
+} catch (WakeupException | InterruptException e) {
+throw e;
+} catch (Exception e) {

Review Comment:
   I can't speak to all the different use cases, but this is what is done in 
the existing `ConsumerCoordinator`, so this was done to keep the existing 
behavior equivalent.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1081,4 +1026,74 @@ boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 // logic
  

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372379141


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   My understanding is that concurrency is not a problem in production logic 
(i.e. the fact that callback might actually execute at any time before or after 
or concurrently with the passing thread), but unit tests rely on deterministic 
order of execution and so adding concurrency here makes them "flaky".
   I agree in principle that if we could fold this logic into just one case 
(i.e. always schedule on a new request thread), it would make it simpler; when 
this change was proposed I looked into unit test fix and seemed more involved 
than adding this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-25 Thread via GitHub


apoorvmittal10 commented on PR #14575:
URL: https://github.com/apache/kafka/pull/14575#issuecomment-1780137826

   @junrao @mjsax @wcarlson5 The following tests are failing which are 
independent of changes in the PR.
   
   ```
   testTrustStoreAlter(String).quorum=kraft – 
kafka.server.DynamicBrokerReconfigurationTest
   15s
   shouldHaveSamePositionBoundActiveAndStandBy – 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   22s
   shouldHaveSamePositionBoundActiveAndStandBy – 
org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest
   12s
   shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing 
threads = false] – org.apache.kafka.streams.integration.EosIntegrationTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-25 Thread via GitHub


apoorvmittal10 commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1372379598


##
clients/src/main/java/org/apache/kafka/common/telemetry/package-info.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides mechanisms for collecting and emitting client telemetry metrics.
+ */
+package org.apache.kafka.common.telemetry;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372377851


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Sorry maybe I'm not explaining myself well.
   
   We saw problems before when the callback was scheduled before the request 
returned. We fixed it by adding this check.
   
   If we want to remove this check, we will need to do further investigation 
for that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372376795


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure the that correct RequestLocal

Review Comment:
   Ensure the that => Ensure that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-25 Thread via GitHub


jolshan commented on PR #14591:
URL: https://github.com/apache/kafka/pull/14591#issuecomment-1780126154

   Sorry I lost track of this. I will take a look soon.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1372371671


##
clients/src/main/java/org/apache/kafka/common/telemetry/package-info.java:
##
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides mechanisms for collecting and emitting client telemetry metrics.
+ */
+package org.apache.kafka.common.telemetry;

Review Comment:
   Since this is not a public facing package, there seems to be no need to add 
this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]

2023-10-25 Thread via GitHub


kirktrue opened a new pull request, #14640:
URL: https://github.com/apache/kafka/pull/14640

   Added logic for reconciling partition assignment between the target 
assignment provided by the group coordinator and the current assignment as 
specified in the SubscriptionState.
   
   This refactors the ConsumerRebalanceListener code from ConsumerCoordinator 
for reuse in both places.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372366141


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   So, is this still a problem that we need to address in this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-25 Thread via GitHub


mumrah commented on code in PR #14593:
URL: https://github.com/apache/kafka/pull/14593#discussion_r1372350163


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -253,17 +269,30 @@ private ElectionResult electAnyLeader() {
 return new ElectionResult(NO_LEADER, false);
 }
 
+private boolean canElectLastKnownLeader() {

Review Comment:
   Thanks, can we make these TRACE since these can be emitted for each 
partition? Also there are some long lines that need to be broken up



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -361,11 +410,16 @@ public Optional build() {
 
 maybeUpdateRecordElr(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
+// If ELR is enabled, the ISR is allowed to be empty.
+if (record.isr() == null && (!targetIsr.isEmpty() || 
eligibleLeaderReplicasEnabled) && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
 // Set the new ISR if it is different from the current ISR and 
unclean leader election didn't already set it.
+if (targetIsr.isEmpty()) {
+log.info("A partition will have an empty ISR. " + this);

Review Comment:
   I think we might want DEBUG here. If we are emptying out the ISR, it's not 
necessarily a problem that requires user intervention. Since we now allow 
emptying the ISR when the final replica goes down, we would end up logging this 
for every partition if someone did a full cluster shutdown. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372329928


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   That seemed to cause problems when David moved changed the code to do the 
callback before we finished processing the request. 
   
   We never figured out the root cause though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372322049


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Thanks, Justine. It seems that we can't easily get rid of the ThreadLocal 
stuff. So, we can leave this as it is.
   
   > I am concerned that executing the callback and before we return from the 
request thread causes issues.
   
   With the current PR, it seems that it's possible for the callback to be 
called before the request handler thread finishes processing the request (that 
generates the callback), right? Is that causing any problem?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372308747


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
   producerId = batchInfo.producerId,
   producerEpoch = batchInfo.producerEpoch,
   topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))
+  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, 
internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,

Review Comment:
   Thanks, Justine and Artem. Agreed that the alternative approach has its own 
drawback. So, we could punt on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-25 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1372301334


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -776,64 +779,55 @@ public void verifyStore() {
 globalShouldRejectAllQueries();
 } else {
 shouldRejectUnknownQuery();
-shouldCollectExecutionInfo();
+//shouldCollectExecutionInfo();
 shouldCollectExecutionInfoUnderFailure();
 final String kind = this.kind;
 if (storeToTest.keyValue()) {
 if (storeToTest.timestamped()) {
-final Function, Integer> 
valueExtractor =
-ValueAndTimestamp::value;
-if (kind.equals("DSL")) {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangeDSLQueries(valueExtractor);
-} else {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangePAPIQueries(valueExtractor);
-}
-} else {
-final Function valueExtractor = 
Function.identity();
-if (kind.equals("DSL")) {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangeDSLQueries(valueExtractor);
-} else {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangePAPIQueries(valueExtractor);
-}
-}
-}
+shouldHandleKeyQuery(2,  5);
+shouldHandleTimestampedKeyQuery(2, 5);
+shouldHandleRangeQueries();
+shouldHandleTimestampRangeQueries();
+  }
+else {
+shouldHandleKeyQuery(2, 5);

Review Comment:
   For ROCK_KV DSL store, and this time Cache is true, the data we get is not 
through the `KeyValueToTimestampedKeyValueByteStoreAdapter `'s query



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15474) AbstractCoordinator.testWakeupAfterSyncGroupReceivedExternalCompletion seems flaky

2023-10-25 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779667#comment-17779667
 ] 

Philip Nee commented on KAFKA-15474:


{code:java}
[2023-10-25 13:27:51,171] ERROR JoinGroup failed: The group instance id 
Optional.empty has been fenced by another instance. Sent generation was 
Generation{generationId=-1, memberId='', protocol='null'} 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest$DummyCoordinator:683)
Should have woken up from 
ensureActiveGroup()org.opentest4j.AssertionFailedError: Should have woken up 
from ensureActiveGroup() at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)   at 
app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)  at 
app//org.apache.kafka.clients.consumer.internals.AbstractCoordinatorTest.testWakeupAfterSyncGroupReceivedExternalCompletion(AbstractCoordinatorTest.java:1531)
   at 
java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)at 
java.base@15.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
  at 
java.base@15.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base@15.0.7/java.lang.reflect.Method.invoke(Method.java:564)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
  at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
  at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
   at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
  at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
  at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
   at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
   at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
  at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
  at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
   at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
  at 
app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)  
 at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
  at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
   at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
   at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
   at java.base@15.0.7/java.util.ArrayList.forEach(ArrayList.java:1511)
at 
app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
   at 

[PR] KAFKA-15562: ensure commit request manager handles errors correctly [kafka]

2023-10-25 Thread via GitHub


philipnee opened a new pull request, #14639:
URL: https://github.com/apache/kafka/pull/14639

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372270435


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
   producerId = batchInfo.producerId,
   producerEpoch = batchInfo.producerEpoch,
   topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))
+  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, 
internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,

Review Comment:
   I think the way it's implemented right know is that we're just continuing 
processing from the current point after the network operation has completed, so 
as far as the "application" logic is concerned, the machinery to resume the 
execution on the request pool is hidden from the "application" logic.  This 
allows to do an arbitrary number of non-blocking waits with minimal disruption 
of the application logic flow (the change is localized in the point of 
non-blocking wait).
   In this case we could inject a new produce request (if I understand your 
question correctly) with some context saying that it's "verified", but I think 
it'd be more intrusive as we'd need to implement some produce-specific 
"verified" context (that would have to include verification guard and whatnot) 
and pass it down through all functions. We'd also need to consider the offset 
commit path, which is a separate path that eventually comes to this function.  
Also, if we consider a more general functionality (we don't have any, but 
hopefully, if we make Kafka more async there will be more cases), generally 
we'd want to preserve the progress that has been made until the network call, 
so if a request would need to work work XYZ and we have non-blocking network 
calls between stages, it would be good that the overall work would be X [RPC] Y 
[RPC] Z, rather than X [RPC] XY [RPC] XYZ.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-25 Thread via GitHub


florin-akermann commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1372233874


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
 LOG.debug("Optimizing the Kafka Streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
 }
+LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+rewriteRepartitionNodes();
 }
 
+private void rewriteRepartitionNodes() {

Review Comment:
   Ok, so basically we remove this optimization completley for now?
   Developers could still just filter out null keys with a 'filter' operator to 
achieve the old behavior.
   And then we make a separate ticket where developers can opt in to this 
optimization?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372169588


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Right -- we had that before and it caused issues @dajac has more context. 
   I am concerned that executing the callback and before we return from the 
request thread causes issues.
   
   In addition, for tests, we would need to set up the request channel etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372169588


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Right -- we had that before and it caused issues @dajac has more context. 
   In addition, for tests, we would need to set up the request channel etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15388) Handle topics that were having compaction as retention earlier are changed to delete only retention policy and onboarded to tiered storage.

2023-10-25 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779623#comment-17779623
 ] 

Arpit Goyal commented on KAFKA-15388:
-

Thanks [~divijvaidya] for providing the initial steps  required to work on 
this. I will go through it.Do you remember any KIP created in past related to 
compacting feature ?

> Handle topics that were having compaction as retention earlier are changed to 
> delete only retention policy and onboarded to tiered storage. 
> 
>
> Key: KAFKA-15388
> URL: https://issues.apache.org/jira/browse/KAFKA-15388
> Project: Kafka
>  Issue Type: Bug
>Reporter: Satish Duggana
>Assignee: Arpit Goyal
>Priority: Blocker
> Fix For: 3.7.0
>
>
> Context: [https://github.com/apache/kafka/pull/13561#discussion_r1300055517]
>  
> There are 3 paths I looked at:
>  * When data is moved to remote storage (1)
>  * When data is read from remote storage (2)
>  * When data is deleted from remote storage (3)
> (1) Does not have a problem with compacted topics. Compacted segments are 
> uploaded and their metadata claims they contain offset from the baseOffset of 
> the segment until the next segment's baseOffset. There are no gaps in offsets.
> (2) Does not have a problem if a customer is querying offsets which do not 
> exist within a segment, but there are offset after the queried offset within 
> the same segment. *However, it does have a problem when the next available 
> offset is in a subsequent segment.*
> (3) For data deleted via DeleteRecords there is no problem. For data deleted 
> via retention there is no problem.
>  
> *I believe the proper solution to (2) is to make tiered storage continue 
> looking for the next greater offset in subsequent segments.*
> Steps to reproduce the issue:
> {code:java}
> // TODO (christo)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372156380


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   @jolshan : I wasn't suggesting simply skip the test 
`threadCurrentRequest.get() == currentRequest`. I was wondering if we could 
always add the callback to the `callbackQueue` through the following call, 
independent of whether the caller is in the same request thread of not. 
   
   ` requestChannel.sendCallbackRequest`
   
   So, `wrap` will just be the following.
   
   ```
 def wrap[T](fun: T => Unit): T => Unit = {
 
requestChannel.sendCallbackRequest(RequestChannel.CallbackRequest(() => fun(T), 
currentRequest))
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372156380


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   @jolshan : I wasn't suggesting simply skip the test 
`threadCurrentRequest.get() == currentRequest`. I was wondering if we could 
always add the callback to the `callbackQueue` through the following call, 
independent of whether the caller is in the same request thread of not. 
   
   ` requestChannel.sendCallbackRequest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372156380


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   @jolshan : I wasn't suggesting simply skip the test 
`threadCurrentRequest.get() == currentRequest`. I was wondering if we could add 
the callback to the `callbackQueue` through the following call, independent of 
whether the caller is in the same request thread of not. 
   
   ` requestChannel.sendCallbackRequest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-10-25 Thread via GitHub


jolshan commented on PR #14242:
URL: https://github.com/apache/kafka/pull/14242#issuecomment-1779818679

   Thanks -- just catching up with the discussion. Just to clarify when we say:
   
   > This is ok for server restart, because on restart, we will rebuild the 
snapshot by scanning last few segments.
   
   In the restart case, we may take a slight performance hit on startup since 
we may have to scan more segments.
   And yeah, we should definitely not update the recovery point until the flush 
is completed successfully.
   
   > If we ignore producer-state-flush failure here, recovery-point might be 
incremented even with stale on-disk producer state snapshot. So, in case of 
restart after power failure, the broker might restore stale producer state 
without rebuilding (since recovery point is incremented) which could cause 
idempotency issues. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372148618


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   The code you provided would first take the arg value from globals and then 
from context, we want the first priority to be from context args and then 
global and if nothing is provided then we use the default value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372148618


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   The code you provided would first take the arg value from globals and then 
from context, we want the first priority to be from context args and then 
global and if nothing is provided then we use the default value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372148618


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   The code you provided would first take from globals and then from context, 
we want the first priority to be from context args and then global and if 
nothing is provided then we use the default value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add pre-requisite check in release.py [kafka]

2023-10-25 Thread via GitHub


divijvaidya commented on code in PR #14636:
URL: https://github.com/apache/kafka/pull/14636#discussion_r1372147994


##
release.py:
##
@@ -491,6 +491,25 @@ def command_release_announcement_email():
 Do you have all of of these setup? (y/n): """ % (PREFS_FILE, json.dumps(prefs, 
indent=2))):
 fail("Please try again once you have all the prerequisites ready.")
 
+apache_id = sanitize_input("Please enter your apache-id: ")
+
+print("Begin to check if you have met all the pre-requisites for the release 
process")
+test_maven = cmd_output("mvn -v")
+if "Apache Maven" in test_maven:
+print("Pre-requisite met: You have maven cli in place")
+else:
+fail("Pre-requisite not met: You need to install maven CLI")

Review Comment:
   I tested it and it did not fail so nicely for me :)
   
   ```
   Do you have all of of these setup? (y/n): y
   Please enter your apache-id: divijv
   Begin to check if you have met all the pre-requisites for the release process
   Traceback (most recent call last):
 File "/opt/kafka/./release.py", line 497, in 
   test_maven = cmd_output("mvn -v")
 File "/opt/kafka/./release.py", line 140, in cmd_output
   return subprocess.check_output(cmd, *args, stderr=subprocess.STDOUT, 
**kwargs).decode('utf-8')
 File "/usr/lib64/python3.9/subprocess.py", line 424, in check_output
   return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
 File "/usr/lib64/python3.9/subprocess.py", line 505, in run
   with Popen(*popenargs, **kwargs) as process:
 File "/usr/lib64/python3.9/subprocess.py", line 951, in __init__
   self._execute_child(args, executable, preexec_fn, close_fds,
 File "/usr/lib64/python3.9/subprocess.py", line 1821, in _execute_child
   raise child_exception_type(errno_num, err_msg, err_filename)
   FileNotFoundError: [Errno 2] No such file or directory: 'mvn'
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372147456


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   I asked Ian in the office yesterday, he said the current implementation 
should work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372146226


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   > What would happen if context.injected_args is defined but not for 
use_new_coordinator
   
   If get doesn't find the arg name then it uses the default value I provided 
as the second argument



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372146226


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   > What would happen if context.injected_args is defined but not for 
use_new_coordinator
   If get doesn't find the arg name then it uses the default value I provided 
as the second argument



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-25 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1372146226


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   > What would happen if context.injected_args is defined but not for 
use_new_coordinator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1364689270


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
 // we manually override the state offset here prior to taking the snapshot.
 producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-producerStateManager.takeSnapshot()
+// We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+// which could block subsequent produces in the meantime.
+// flush is done in the scheduler thread along with segment flushing below
+val maybeSnapshot = producerStateManager.takeSnapshot(false)
 updateHighWatermarkWithLogEndOffset()
 // Schedule an asynchronous flush of the old segment
-scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+scheduler.scheduleOnce("flush-log", () => {
+  maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, 
"producer-snapshot"))

Review Comment:
   If we fail to flush the snapshot, it seems that we should propagate the 
IOException to logDirFailureChannel like in flushUptoOffsetExclusive. 
Otherwise, we could be skipping the recovery of producer state when we should. 



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -308,7 +308,14 @@ public void truncateFromEnd(long endOffset) {
 if (endOffset >= 0 && epochEntry.isPresent() && 
epochEntry.get().startOffset >= endOffset) {
 List removedEntries = removeFromEnd(x -> 
x.startOffset >= endOffset);
 
-flush();
+// We intentionally don't force flushing change to the device 
here because:
+// - To avoid fsync latency
+//   * fsync latency could be huge on a disk glitch, which is 
not rare in spinning drives
+//   * This method is called by ReplicaFetcher threads, which 
could block replica fetching
+// then causing ISR shrink or high produce response time 
degradation in remote scope on high fsync latency.
+// - Even when stale epochs remained in LeaderEpoch file due 
to the unclean shutdown, it will be handled by
+//   another truncateFromEnd call on log loading procedure so 
it won't be a problem
+flush(false);

Review Comment:
   It's kind of weird to call `flush` with sync = false since the only thing 
that `flush` does is to sync. Could we just avoid calling `flush`?



##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -152,7 +152,7 @@ private List 
removeWhileMatching(Iterator lastEntry(long 
producerId) {
 }
 
 /**
- * Take a snapshot at the current end offset if one does not already exist.
+ * Take a snapshot at the current end offset if one does not already exist 
with syncing the change to the device
  */
 public void takeSnapshot() throws IOException {
+takeSnapshot(true);
+}
+
+/**
+ * Take a snapshot at the current end offset if one does not already 
exist, then return the snapshot file if taken.
+ */
+public Optional takeSnapshot(boolean sync) throws IOException {

Review Comment:
   `ProducerStateManager.truncateFullyAndReloadSnapshots` removes all snapshot 
files and then calls `loadSnapshots()`, which should return empty. I am 
wondering what happens if we have an pending async snapshot flush and the flush 
is called after the underlying file is deleted because of 
`ProducerStateManager.truncateFullyAndReloadSnapshots`. Will that cause the 
file to be recreated or will it get an `IOException`? The former will be bad 
since the content won't be correct. For the latter, it would be useful to 
distinguish that from a real disk IO error to avoid unnecessarily crash the 
broker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamsMetricsImplTest to Mockito [kafka]

2023-10-25 Thread via GitHub


divijvaidya commented on code in PR #14623:
URL: https://github.com/apache/kafka/pull/14623#discussion_r1372120378


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java:
##
@@ -1293,43 +1204,33 @@ private void verifyMetric(final String name,
 public void shouldMeasureLatency() {
 final long startTime = 6;
 final long endTime = 10;
-final Sensor sensor = createMock(Sensor.class);
-expect(sensor.shouldRecord()).andReturn(true);
-expect(sensor.hasMetrics()).andReturn(true);
-sensor.record(endTime - startTime);
+final Sensor sensor = mock(Sensor.class);
+when(sensor.shouldRecord()).thenReturn(true);
+when(sensor.hasMetrics()).thenReturn(true);
+doNothing().when(sensor).record(endTime - startTime);

Review Comment:
   for my curiosity, does strict subs verify usage of stubs defined using 
`doNothin()` as well or just for subs defined using `when()`



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java:
##
@@ -146,69 +135,26 @@ public class StreamsMetricsImplTest {
 private final MockTime time = new MockTime(0);
 private final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, time);
 
-private static MetricConfig eqMetricConfig(final MetricConfig 
metricConfig) {
-EasyMock.reportMatcher(new IArgumentMatcher() {
-private final StringBuffer message = new StringBuffer();
-
-@Override
-public boolean matches(final Object argument) {
-if (argument instanceof MetricConfig) {
-final MetricConfig otherMetricConfig = (MetricConfig) 
argument;
-final boolean equalsComparisons =
-(otherMetricConfig.quota() == metricConfig.quota() ||
-
otherMetricConfig.quota().equals(metricConfig.quota())) &&
-otherMetricConfig.tags().equals(metricConfig.tags());
-if (otherMetricConfig.eventWindow() == 
metricConfig.eventWindow() &&
-otherMetricConfig.recordLevel() == 
metricConfig.recordLevel() &&
-equalsComparisons &&
-otherMetricConfig.samples() == metricConfig.samples() 
&&
-otherMetricConfig.timeWindowMs() == 
metricConfig.timeWindowMs()) {
-
-return true;
-} else {
-message.append("{ ");
-message.append("eventWindow=");
-message.append(otherMetricConfig.eventWindow());
-message.append(", ");
-message.append("recordLevel=");
-message.append(otherMetricConfig.recordLevel());
-message.append(", ");
-message.append("quota=");
-message.append(otherMetricConfig.quota().toString());
-message.append(", ");
-message.append("samples=");
-message.append(otherMetricConfig.samples());
-message.append(", ");
-message.append("tags=");
-message.append(otherMetricConfig.tags().toString());
-message.append(", ");
-message.append("timeWindowMs=");
-message.append(otherMetricConfig.timeWindowMs());
-message.append(" }");
-}
-}
-message.append("not a MetricConfig object");
-return false;
-}
-
-@Override
-public void appendTo(final StringBuffer buffer) {
-buffer.append(message);
-}
-});
-return null;
-}
-
-private Capture addSensorsOnAllLevels(final Metrics metrics, final 
StreamsMetricsImpl streamsMetrics) {
-final Capture sensorKeys = newCapture(CaptureType.ALL);
+private static boolean eqMetricConfig(final MetricConfig thisMetricConfig, 
final MetricConfig thatMetricConfig) {

Review Comment:
   This we are not printing details when assertion fails, can you please add a 
message to print the thatMetricConfig similar to 
`assertTrue(eqMetricConfig(expected, actual), "my message here")`
   
   Will help in debugging failed tests and maintain parity with existing code. 
Maybe someone added such detailed logging in one place because debugging this 
was very hard!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact 

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372124366


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
   producerId = batchInfo.producerId,
   producerEpoch = batchInfo.producerEpoch,
   topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))
+  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, 
internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,

Review Comment:
   I'm not sure exactly how this would work. I suppose we could have a flag 
"verified" in some sort of context, but where is that context stored? 
   
   Right now the request considers that some partitions are verified and some 
are not. (I'm not sure the producer actually sends more than one partition), 
but the code is currently generalizable to handle multiple partitions. If this 
is the case the context needs to store which partitions are verified and which 
are not. 
   
   I agree that this is complicated, but I would need some time to think about 
how this would work. I'm also not sure the implications of going through the 
.handle of the request path -- it would be as if we received another request in 
the metrics etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


philipnee commented on PR #14638:
URL: https://github.com/apache/kafka/pull/14638#issuecomment-1779768168

   Hi @kirktrue - I made a pass and left some comments. Let me know if anything 
is unclear!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372118635


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -162,13 +162,13 @@ else if (this.subscriptionType != type)
 throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
 }
 
-public synchronized boolean subscribe(Set topics, 
ConsumerRebalanceListener listener) {
+public synchronized boolean subscribe(Set topics, 
Optional listener) {

Review Comment:
   Actually - is listener actually optional here? It seems like if the listener 
is null, it should complain? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372117884


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -285,10 +285,8 @@ public synchronized void 
assignFromSubscribed(Collection assignm
 this.assignment.set(assignedPartitionStates);
 }
 
-private void registerRebalanceListener(ConsumerRebalanceListener listener) 
{
-if (listener == null)
-throw new IllegalArgumentException("RebalanceListener cannot be 
null");
-this.rebalanceListener = listener;
+private void registerRebalanceListener(Optional 
listener) {
+this.rebalanceListener = Objects.requireNonNull(listener, 
"RebalanceListener cannot be null");

Review Comment:
   Does Objects.requireNonNull checks the lister.get() or the listener? Since 
you are passing in an Optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372115691


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -162,13 +162,13 @@ else if (this.subscriptionType != type)
 throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
 }
 
-public synchronized boolean subscribe(Set topics, 
ConsumerRebalanceListener listener) {
+public synchronized boolean subscribe(Set topics, 
Optional listener) {

Review Comment:
   Another suggestion is to not change the interface, then we do:
   
   registerRebalanceListener(Optional.ofNullabe(listener)) - this prevents us 
to do a requireNonNull check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372114475


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   Without this, there were a ton of failing tests when David implemented his 
change. We never really figured out the issue. We also have to consider the 
case where we bypass the thread check (ie tests) where we don't have the 
callback queue set up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372112998


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -1081,4 +1026,74 @@ boolean updateAssignmentMetadataIfNeeded(Timer timer) {
 // logic
 return updateFetchPositions(timer);
 }
+
+@Override
+public void subscribe(Collection topics) {
+subscribeInternal(topics, Optional.empty());
+}
+
+@Override
+public void subscribe(Collection topics, ConsumerRebalanceListener 
listener) {
+if (listener == null)
+throw new IllegalArgumentException("RebalanceListener cannot be 
null");
+
+subscribeInternal(topics, Optional.of(listener));
+}
+
+@Override
+public void subscribe(Pattern pattern) {
+subscribeInternal(pattern, Optional.empty());
+}
+
+@Override
+public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+if (listener == null)
+throw new IllegalArgumentException("RebalanceListener cannot be 
null");
+
+subscribeInternal(pattern, Optional.of(listener));
+}
+
+private void subscribeInternal(Pattern pattern, 
Optional listener) {
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.toString().isEmpty())
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+"null" : "empty"));
+
+throwIfNoAssignorsConfigured();
+log.info("Subscribed to pattern: '{}'", pattern);
+subscriptions.subscribe(pattern, listener);
+updatePatternSubscription(metadata.fetch());
+metadata.requestUpdateForNewTopics();
+}
+
+private void subscribeInternal(Collection topics, 
Optional listener) {
+maybeThrowInvalidGroupIdException();
+if (topics == null)
+throw new IllegalArgumentException("Topic collection to subscribe 
to cannot be null");
+if (topics.isEmpty()) {
+// treat subscribing to empty topic list as the same as 
unsubscribing
+unsubscribe();
+} else {
+for (String topic : topics) {
+if (isBlank(topic))
+throw new IllegalArgumentException("Topic collection to 
subscribe to cannot contain null or empty topic");
+}
+
+throwIfNoAssignorsConfigured();
+
+// Clear the buffered data which are not a part of newly assigned 
topics
+final Set currentTopicPartitions = new HashSet<>();
+
+for (TopicPartition tp : subscriptions.assignedPartitions()) {
+if (topics.contains(tp.topic()))

Review Comment:
   ah, seems like this is done for my comment above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


philipnee commented on code in PR #14638:
URL: https://github.com/apache/kafka/pull/14638#discussion_r1372104074


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in 
the {@link ConsumerRebalanceListener}
+ * interface. When consumer group partition assignment changes, these methods 
are invoked. This class wraps those
+ * callback calls with some logging, optional {@link Sensor} updates, etc.
+ */
+class ConsumerRebalanceListenerInvoker {
+
+private final Logger log;
+private final SubscriptionState subscriptions;
+private final Time time;
+private final ConsumerCoordinatorMetrics coordinatorMetrics;
+
+ConsumerRebalanceListenerInvoker(LogContext logContext,
+ SubscriptionState subscriptions,
+ Time time,
+ ConsumerCoordinatorMetrics 
coordinatorMetrics) {
+this.log = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.time = time;
+this.coordinatorMetrics = coordinatorMetrics;
+}
+
+Exception invokePartitionsAssigned(final SortedSet 
assignedPartitions) {
+log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions, ", "));
+
+Optional listener = 
subscriptions.rebalanceListener();
+
+if (listener.isPresent()) {
+try {
+final long startMs = time.milliseconds();
+listener.get().onPartitionsAssigned(assignedPartitions);
+
coordinatorMetrics.assignCallbackSensor.record(time.milliseconds() - startMs);
+} catch (WakeupException | InterruptException e) {
+throw e;
+} catch (Exception e) {
+log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
+listener.getClass().getName(), assignedPartitions, e);
+return e;
+}
+}
+
+return null;
+}
+
+Exception invokePartitionsRevoked(final SortedSet 
revokedPartitions) {
+log.info("Revoke previously assigned partitions {}", 
Utils.join(revokedPartitions, ", "));
+Set revokePausedPartitions = 
subscriptions.pausedPartitions();
+revokePausedPartitions.retainAll(revokedPartitions);
+if (!revokePausedPartitions.isEmpty())
+log.info("The pause flag in partitions [{}] will be removed due to 
revocation.", Utils.join(revokePausedPartitions, ", "));
+
+Optional listener = 
subscriptions.rebalanceListener();
+
+if (listener.isPresent()) {
+try {
+final long startMs = time.milliseconds();
+listener.get().onPartitionsRevoked(revokedPartitions);
+
coordinatorMetrics.revokeCallbackSensor.record(time.milliseconds() - startMs);
+} catch (WakeupException | InterruptException e) {
+throw e;
+} catch (Exception e) {

Review Comment:
   we should definitely return all KafkaExceptions; are there java exceptions 
that we want to throw back immediately to the user though? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this 

Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-25 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1372064089


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   The case when the callback is executed on the same request thread only 
happens when the txn coordinator is not available. Since it's a rare case, I am 
wondering if we really need to optimize for that. It seems that it's simpler to 
always enqueue the callback. Then, we could probably get rid of the ThreadLocal 
stuff.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -846,7 +759,8 @@ class ReplicaManager(val config: KafkaConfig,
   producerId = batchInfo.producerId,
   producerEpoch = batchInfo.producerEpoch,
   topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq,
-  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))
+  callback = 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, 
internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap,

Review Comment:
   This approach works. It's bit complicated since there are multiple callbacks 
(from `KafkaApis` and from `ReplicaManager`) wrapped in different levels. An 
alternative approach is to simply re-enqueue the original request and some kind 
of context from the result of pre-validation. When the pre-validation is done, 
we could just run the logic on the original request again through KafkaAPIs but 
with the new context, which will allow us to bypass the validation. This avoids 
the need to pass `appendEntries` as a callback. Will that be simpler?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,107 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],

Review Comment:
   Could we add a comment that this method will potentially be called in a 
different request thread and it should avoid accessing any thread unsafe data 
structures?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


kirktrue commented on PR #14638:
URL: https://github.com/apache/kafka/pull/14638#issuecomment-1779709409

   @dajac @philipnee Can you add the `ctr` and `KIP-848` tags, please? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


kirktrue opened a new pull request, #14638:
URL: https://github.com/apache/kafka/pull/14638

   Straightforward refactoring to extract an inner class and methods related to 
`ConsumerRebalanceListener` for reuse in the KIP-848 implementation of the 
consumer group protocol. Also using `Optional` to explicitly mark when a 
`ConsumerRebalanceListener` is in use or not, allowing us to make some 
(forthcoming) optimizations when there is no listener to invoke.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


kirktrue commented on PR #14565:
URL: https://github.com/apache/kafka/pull/14565#issuecomment-1779699978

   This is now outdated. Please see #14638.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-10-25 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-15680:
-
Fix Version/s: 3.6.1

> Partition-Count is not getting updated Correctly in the Incremental 
> Co-operative Rebalancing(ICR) Mode of Rebalancing
> -
>
> Key: KAFKA-15680
> URL: https://issues.apache.org/jira/browse/KAFKA-15680
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Assignee: Pritam Kumar
>Priority: Minor
> Fix For: 3.6.1
>
>
> * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, 
> say Worker 3 joins, a new global assignment is computed by the leader, say 
> Worker1, that results in the revocation of some tasks from each existing 
> worker i.e Worker1 and Worker2.
>  * Once the new member join is completed, 
> *ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
> computes all the new partitions assigned and the partitions which are revoked 
> and updates the subscription Object.
>  * If it was the case of revocation which we check by checking the 
> “partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
> which internally calls “updatePartitionCount()” which fetches partition from 
> the *assignment* object which is yet not updated by the new assignment.
>  * It is only just before calling the “{*}invokePartitionsAssigned{*}()” 
> method that we update the *assignment* by invoking the following → 
> *subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-25 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1372064203


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -190,7 +192,7 @@ class BrokerLifecycleManager(
   /**
* The broker epoch from the previous run, or -1 if the epoch is not able to 
be found.
*/
-  @volatile private var previousBrokerEpoch: Long = -1L
+  @volatile private var previousBrokerEpoch: OptionalLong = 
OptionalLong.empty()

Review Comment:
   Done.



##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CleanShutdownFileHandler.java:
##
@@ -45,17 +44,22 @@
 
 public class CleanShutdownFileHandler {
 public static final String CLEAN_SHUTDOWN_FILE_NAME = 
".kafka_cleanshutdown";
-private final File cleanShutdownFile;
+// Visible for testing
+public final File cleanShutdownFile;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-25 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1372063969


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CleanShutdownFileHandler.java:
##
@@ -85,17 +87,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);
-
-brokerEpoch = 
Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
+Content content = OBJECT_MAPPER.readValue(text, Content.class);

Review Comment:
   Thanks! Good to know we have a package to do it.



##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/CleanShutdownFileHandler.java:
##
@@ -73,10 +77,8 @@ void write(long brokerEpoch, int version) throws Exception {
 FileOutputStream os = new FileOutputStream(cleanShutdownFile);
 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os, 
StandardCharsets.UTF_8));
 try {
-Map payload = new HashMap<>();
-payload.put(Fields.VERSION.toString(), Integer.toString(version));
-payload.put(Fields.BROKER_EPOCH.toString(), 
Long.toString(brokerEpoch));
-bw.write(new ObjectMapper().writeValueAsString(payload));
+Content content = new Content(version, brokerEpoch);
+bw.write(OBJECT_MAPPER.writeValueAsString(content));

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: avoid blocking for randomness in DefaultRecordBatchTest [kafka]

2023-10-25 Thread via GitHub


gnarula commented on code in PR #14625:
URL: https://github.com/apache/kafka/pull/14625#discussion_r1372058951


##
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java:
##
@@ -63,15 +61,9 @@
 import static org.mockito.Mockito.when;
 
 public class DefaultRecordBatchTest {
-private static final Random RANDOM;
-
-static {
-try {
-RANDOM = SecureRandom.getInstanceStrong();
-} catch (NoSuchAlgorithmException e) {
-throw new RuntimeException(e);
-}
-}
+// We avoid SecureRandom.getInstanceStrong() here because it reads from 
/dev/random and blocks on Linux. Since these
+// tests don't require cryptographically strong random data, we avoid a 
CSPRNG (SecureRandom) altogether.
+private static final Random RANDOM = new Random();

Review Comment:
   Addressed in 
[afc7129](https://github.com/apache/kafka/pull/14625/commits/afc71297c805eb149a3dee450cd4e204c4fb57d9)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Add explicit exit call to each shell command [kafka]

2023-10-25 Thread via GitHub


gharris1727 commented on PR #14633:
URL: https://github.com/apache/kafka/pull/14633#issuecomment-1779677563

   Hi @tkornai thanks for the PR!
   
   I think it's a relatively common expectation that returning from `main` will 
exit the program, and this problem is likely to regress in the future if we're 
not careful.
   
   Do you have source access over the custom security provider implementation? 
It may be appropriate to mark the security provider's thread as a Daemon 
thread, with `thread.setDaemon(true)`. This should have the same effect as 
explicitly calling `exit(0)`, since daemon threads are automatically stopped 
during JVM shutdown.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-25 Thread via GitHub


kirktrue closed pull request #14565: KAFKA-15628: Refactor 
ConsumerRebalanceListener invocation for reuse
URL: https://github.com/apache/kafka/pull/14565


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15276: Implement partition assignment reconciliation [kafka]

2023-10-25 Thread via GitHub


kirktrue closed pull request #14357: KAFKA-15276: Implement partition 
assignment reconciliation
URL: https://github.com/apache/kafka/pull/14357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372047782


##
core/src/test/scala/kafka/metrics/ClientMetricsTestUtils.scala:
##
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import ClientMetricsConfig.ClientMatchingParams.{CLIENT_SOFTWARE_NAME, 
CLIENT_SOFTWARE_VERSION}
+
+import java.util.Properties
+
+object ClientMetricsTestUtils {
+  val defaultPushInterval = 30 * 1000 // 30 seconds

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372046737


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,
+   clientPort: String): Unit = {
+attributesMap(CLIENT_INSTANCE_ID) = clientInstanceId
+attributesMap(CLIENT_ID) = clientId
+attributesMap(CLIENT_SOFTWARE_NAME) = softwareName
+attributesMap(CLIENT_SOFTWARE_VERSION) = softwareVersion
+attributesMap(CLIENT_SOURCE_ADDRESS) = clientHostAddress
+attributesMap(CLIENT_SOURCE_PORT) = clientPort
+  }
+  def getClientId: Option[String] = attributesMap.get(CLIENT_ID)
+
+  def isMatched(patterns: Map[String, String]) : Boolean = {
+// Empty pattern or missing pattern still considered as a match
+if (patterns == null || patterns.isEmpty) {
+  true
+} else {
+  matchPatterns(patterns)
+}
+  }
+
+  private def matchPatterns(matchingPatterns: Map[String, String]) : Boolean = 
{
+try {
+  matchingPatterns.foreach {
+  case 

Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-25 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1372045729


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,

Review Comment:
   Dne, also changed other variables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >