[GitHub] [kafka] satishd opened a new pull request, #13535: [DRAFT] KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-10 Thread via GitHub


satishd opened a new pull request, #13535:
URL: https://github.com/apache/kafka/pull/13535

   This PR is not yet ready for review. It is built on top of other PR-13487
   
   This PR includes 
   - Recognize the fetch requests with out of range local log offsets
   - Add fetch implementation for the data lying in the range of 
[logStartOffset, localLogStartOffset]
   - Add a new purgatory for async remote read requests which are served 
through a specific thread pool
   
   todo: Add more tests for the newly introduced changes. There are some tests 
available for these scenarios in 2.8.x, refactoring with the trunk changes. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] emissionnebula commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


emissionnebula commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502743377

   > @ijuma @rondagostino : I feel like "persistent" should appear somewhere in 
the class names here. Perhaps you're right that it doesn't need to be in the 
short class name, but can we put the classes in a namespace that includes that 
word? So something like `org.apache.kafka.server.persistent`? Then we'd have 
`org.apache.kafka.server.persistent.ImmutableMap`, etc.
   
   > PMap or PersistentMap are also possible, but they can be confused with a 
map that persists the data
   
   I feel we should not only use the `persistent` in the package name but also 
name the class `PersistentMap`. Looks like persistent is not a new term, there 
was a 1989 
[paper](https://www.cs.cmu.edu/~sleator/papers/making-data-structures-persistent.pdf)
 that coined this term. Clojure also defines them as PersistentVector and 
PersistentMap. Languages such as Scala, Elixir, and Haskell use the term 
persistent in the documentation as well. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14889) RemoteLogManager - allow consumer fetch records from remote storage implementation

2023-04-10 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14889:
-

 Summary: RemoteLogManager - allow consumer fetch records from 
remote storage implementation 
 Key: KAFKA-14889
 URL: https://issues.apache.org/jira/browse/KAFKA-14889
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen


Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
covers enabling consumers fetch records from remote storage

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation

2023-04-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-14888:
-

Assignee: Luke Chen

> RemoteLogManager - deleting expired/size breached log segments to remote 
> storage implementation 
> 
>
> Key: KAFKA-14888
> URL: https://issues.apache.org/jira/browse/KAFKA-14888
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
> covers deleting time/size breached log segments in remote storage.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14889) RemoteLogManager - allow consumer fetch records from remote storage implementation

2023-04-10 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-14889:
-

Assignee: Satish Duggana

> RemoteLogManager - allow consumer fetch records from remote storage 
> implementation 
> ---
>
> Key: KAFKA-14889
> URL: https://issues.apache.org/jira/browse/KAFKA-14889
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Satish Duggana
>Priority: Major
>
> Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
> covers enabling consumers fetch records from remote storage
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]
> h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

2023-04-10 Thread via GitHub


vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1162213072


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##
@@ -153,11 +154,37 @@ public void init(final ProcessorContext context) {
 
 @Override
 public ValueAndTimestamp get(final K key) {
-final ValueAndTimestamp valueAndTimestamp1 = 
valueGetter1.get(key);
+return computeJoin(key, valueGetter1::get, valueGetter2::get);
+}
+
+@Override
+public ValueAndTimestamp get(final K key, final long 
asOfTimestamp) {

Review Comment:
   > You say "older join result" -- don't think they would get a join result, 
would they?
   
   They get what should've been a join result, if the join were to emit a 
complete history of older join results (which it does not due to computational 
expediency). Here's a concrete example to check we're on the same page. Suppose 
we have an inner join, and all records are for the same key:
   ```
   A: (a5, ts=5)
   B: (b1, ts=1) --> triggers join result (a5, b1) with ts=5
   A: (a2, ts=2) --> no new join result, because this record is out-of-order
   ```
   If the result is not materialized and someone calls `get(k, 2)` on the value 
getter, then the value getter will join `a2` and `b1` on the fly and return 
`(a2, b1)` even though this was never emitted downstream. 
   
   I gave this some more thought and I think the behavior could be desirable, 
even though I agree with your statement:
   > given how the "join processor" works, it basically get a versioned input 
ktable, and produced a non-versioned ktable and drops out-of-order records. So 
if we would only expose a value-getter that does not support get(k, ts) it 
would be reasonable to me.
   
   I think there actually is a situation in which `get(k, ts)` would be called 
on this join value getter today. If the table-table join result is not 
materialized, and is directly joined to a stream, then if the table-table join 
result is identified as "versioned" then the stream-table join will call 
`get(k, ts)` on the value getter. This situation is really interesting because 
it would be wrong for the user to explicitly materialize the result of the 
table-table join with a versioned store, and then join it to the stream, but if 
they do not explicitly materialize the result and instead perform the join 
directly, then they can get proper stream-table join semantics using the value 
getter.
   
   Assuming my understanding is correct, then we have two options:
   1. Say that the result of a table-table join (where both input tables are 
versioned) where the result is not explicitly materialized is versioned, and 
have the value getter support versioning, as in the current PR. Then the 
stream-(table-table) join uses fully versioned semantics and returns correct 
results.
   2. Say the the result of a table-table join (where both input tables are 
versioned) where the result is not explicitly materialized is not versioned, 
and update the value getter to reflect this. Then the stream-(table-table) join 
does not use versioned semantics. Users need to perform a (stream-table)-table 
join to get versioned semantics instead.
   
   The first option is nice in that now `stream-(table-table)` and 
`(stream-table)-table` joins with no intermediate materialization produce the 
same results, but it's also confusing because `stream-(table-table)` produces 
different results if the user materializes the result of the table-table join 
as a versioned store (which is wrong). 
   
   WDYT? I'm happy with either approach now that I feel like we've discussed 
all angles fully. We do need to make a decision in this PR though, if my 
understanding about `get(k, ts)` being called from downstream stream-table 
joins if it's supported is correct else there will be compatibility 
implications for changing it in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation

2023-04-10 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14888:
-

 Summary: RemoteLogManager - deleting expired/size breached log 
segments to remote storage implementation 
 Key: KAFKA-14888
 URL: https://issues.apache.org/jira/browse/KAFKA-14888
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen


Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA 
covers deleting time/size breached log segments in remote storage.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14834) Improved processor semantics for versioned stores

2023-04-10 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-14834:
-
Description: 
With the introduction of versioned state stores in 
[KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
 we should leverage them to provide improved join semantics. 

As described in 
[KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores],
 we will make the following four improvements:
 * stream-table joins will perform a timestamped lookup (using the stream-side 
record timestamp) if the table is versioned
 * table-table joins, including foreign key joins, will not produce new join 
results on out-of-order records (by key) from versioned tables
 * table filters will disable the existing optimization to not send duplicate 
tombstones when applied to a versioned table
 * table aggregations will ignore out-of-order records when aggregating a 
versioned table

  was:
With the introduction of versioned state stores in 
[KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
 we should leverage them to provide improved join semantics. 

As described in 
[KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores],
 we will make the following two improvements:
 * stream-table joins will perform a timestamped lookup (using the stream-side 
record timestamp) if the table is materialized with a versioned store
 * table-table joins, including foreign key joins, will not produce new join 
results on out-of-order records (by key) from tables materialized with 
versioned stores

Summary: Improved processor semantics for versioned stores  (was: 
Improved stream-table and table-table join semantics for versioned stores)

> Improved processor semantics for versioned stores
> -
>
> Key: KAFKA-14834
> URL: https://issues.apache.org/jira/browse/KAFKA-14834
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
>  Labels: kip, streams
>
> With the introduction of versioned state stores in 
> [KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores],
>  we should leverage them to provide improved join semantics. 
> As described in 
> [KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores],
>  we will make the following four improvements:
>  * stream-table joins will perform a timestamped lookup (using the 
> stream-side record timestamp) if the table is versioned
>  * table-table joins, including foreign key joins, will not produce new join 
> results on out-of-order records (by key) from versioned tables
>  * table filters will disable the existing optimization to not send duplicate 
> tombstones when applied to a versioned table
>  * table aggregations will ignore out-of-order records when aggregating a 
> versioned table



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191919


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {
+/**
+ * @return the underlying persistent map
+ */
+Object underlying();
+
+/**
+ * @param key the key
+ * @param value the value
+ * @return a wrapped persistent map that differs from this one in that the 
given mapping is added (if necessary)
+ */
+ImmutableMap updated(K key, V value);

Review Comment:
   I was trying to avoid inventing a new naming convention and hence went with 
what `Scala` had chosen. If we have another convention we like more that there 
is a precedent for, happy to discuss.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191307


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {

Review Comment:
   I think this looks pretty clean now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


ijuma commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191307


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {

Review Comment:
   I think this looks pretty clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162184665


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {
+/**
+ * @return the underlying persistent map
+ */
+Object underlying();

Review Comment:
   I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


rondagostino commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162184505


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {

Review Comment:
   I updated it.  I removed the factory in favor of static methods on the 
interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing

2023-04-10 Thread via GitHub


rondagostino commented on PR #13532:
URL: https://github.com/apache/kafka/pull/13532#issuecomment-1502480192

   Test failures are unrelated.  
`MirrorConnectorsWithCustomForwardingAdminIntegrationTest` passed locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14887) ZK session timeout can cause broker to shutdown

2023-04-10 Thread Ron Dagostino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ron Dagostino reassigned KAFKA-14887:
-

Assignee: Ron Dagostino

> ZK session timeout can cause broker to shutdown
> ---
>
> Key: KAFKA-14887
> URL: https://issues.apache.org/jira/browse/KAFKA-14887
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 
> 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 2.7.2, 3.1.0, 2.7.1, 2.8.0, 
> 2.7.0
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>
> We have the following code in FinalizedFeatureChangeListener.scala which will 
> exit regardless of the type of exception that is thrown when trying to 
> process feature changes:
> case e: Exception => {
>   error("Failed to process feature ZK node change event. The broker 
> will eventually exit.", e)
>   throw new FatalExitError(1)
> The issue here is that this does not distinguish between exceptions caused by 
> an inability to process a feature change and an exception caused by a 
> ZooKeeper session timeout.  We want to shut the broker down for the former 
> case, but we do NOT want to shut the broker down in the latter case; the 
> ZooKeeper session will eventually be reestablished, and we can continue 
> processing at that time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-04-10 Thread via GitHub


philipnee commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1162151428


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -447,6 +482,19 @@ public void close() {
 
 @Override
 public void close(Duration timeout) {
+if (timeout.toMillis() < 0)
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+try {
+if (!closed) {
+close(timeout, false);
+}
+} finally {
+closed = true;

Review Comment:
   actually that's a good point. I did this because it was the original 
implementation and I didn't put too much thinking into this.  For the current 
threading model, we definitely want to avoid multiple thread trying to close 
the client.  Can we synchronize on this method to prevent multithreading access?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #13534: KAFKA-14054: Handle TimeoutException gracefully

2023-04-10 Thread via GitHub


mjsax opened a new pull request, #13534:
URL: https://github.com/apache/kafka/pull/13534

   We incorrectly assumed, that `consumer.position()` should always be served 
by the consumer locally set position.
   
   However, within `commitNeeded()` we check if first `if(commitNeeded)` and 
thus go into the else only if we have not processed data (otherwise, 
`commitNeeded` would be true). For this reason, we actually don't know if the 
consumer has a valid position or not.
   
   We should just swallow a timeout if the consumer cannot get the position 
from the broker, and try the next partition. If any position advances, we can 
return true, and if we timeout for all partitions we can return false.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502416844

   > Yes, updated would only make sense on the map interface. For the Set 
interface, added seems reasonable.
   
   Sorry to bikeshed this again but I kinda like `withAdded`, `withRemoval`, 
etc. Curious how you feel about those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502415755

   @ijuma @rondagostino : I feel like "persistent" should appear somewhere in 
the class names here. Perhaps you're right that it doesn't need to be in the 
short class name, but can we put the classes in a namespace that includes that 
word? So something like `org.apache.kafka.server.persistent`? Then we'd have 
`org.apache.kafka.server.persistent.ImmutableMap`, etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162134827


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {
+/**
+ * @return the underlying persistent map
+ */
+Object underlying();
+
+/**
+ * @param key the key
+ * @param value the value
+ * @return a wrapped persistent map that differs from this one in that the 
given mapping is added (if necessary)
+ */
+ImmutableMap updated(K key, V value);

Review Comment:
   how do you guys feel about `withUpdate` as a name?



##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {
+/**
+ * @return the underlying persistent map
+ */
+Object underlying();
+
+/**
+ * @param key the key
+ * @param value the value
+ * @return a wrapped persistent map that differs from this one in that the 
given mapping is added (if necessary)
+ */
+ImmutableMap updated(K key, V value);
+
+/**
+ * @param key the key
+ * @return a wrapped persistent map that differs from this one in that the 
given mapping is removed (if necessary)
+ */
+ImmutableMap removed(K key);

Review Comment:
   how do you guys feel about `withRemoval` as a name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup

2023-04-10 Thread via GitHub


vvcephei commented on code in PR #13490:
URL: https://github.com/apache/kafka/pull/13490#discussion_r1162130597


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -447,6 +482,19 @@ public void close() {
 
 @Override
 public void close(Duration timeout) {
+if (timeout.toMillis() < 0)
+throw new IllegalArgumentException("The timeout cannot be 
negative.");
+try {
+if (!closed) {
+close(timeout, false);
+}
+} finally {
+closed = true;

Review Comment:
   It's interesting to set this only after the call to close completes or fails.
   
   I could see setting it only after it completes to ensure it's really closed, 
or setting it with a CAS before calling the inner close so that multiple 
callers don't all try to close at once.
   
   I'm not sure I see the rationale of doing it this way, though. If there is a 
reason I'm not seeing, could you add a comment explaining it so the code will 
be clear?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -319,26 +328,52 @@ public Map 
committed(final Set
+ * If the timeout specified by {@code default.api.timeout.ms} expires
+ * {@link org.apache.kafka.common.errors.TimeoutException} is thrown.
+ *
+ * @param partitions The partition to check
+ * @param timeout The maximum time to block.
+ * @return The last committed offset and metadata or null if there was no 
prior commit
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link 
#wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the 
calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if 
authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not 
authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other 
unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the 
committed offset cannot be found before
+ * the timeout specified by {@code default.api.timeout.ms} 
expires.
+ */
 @Override
 public Map committed(final 
Set partitions,
 final Duration 
timeout) {
+maybeWakeup();
 maybeThrowInvalidGroupIdException();
+
 if (partitions.isEmpty()) {
 return new HashMap<>();
 }
 
 final OffsetFetchApplicationEvent event = new 
OffsetFetchApplicationEvent(partitions);
+activeFutures.add(event.future());
 eventHandler.add(event);
 try {
-return event.complete(Duration.ofMillis(100));
+
+return event.complete(timeout);
+} catch (ExecutionException e) {
+throw new KafkaException(e);
 } catch (InterruptedException e) {
-throw new InterruptException(e);
+throw new InterruptException(e.getMessage());

Review Comment:
   What's up with this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162131601


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {

Review Comment:
   I don't feel very strongly about this either way. I like the elegance of 
having a separate factory instead of static functions, but I do wonder if that 
will make it harder to do a quick replacement of the current implementation 
with the Vavr implementation (or whatever) and rerun the JMH benchmarks without 
changing anything else. Maybe it's fine as-is, though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162131601


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {

Review Comment:
   I don't feel strongly about it, but I agree that from a practical point of 
view it might be easier to have a factory method. Then you could just replace 
that factory method implementation with the Vavr implementation (or whatever) 
and rerun the JMH benchmarks without changing anything else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162129955


##
server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.immutable;
+
+import java.util.Map;
+
+/**
+ * A persistent Hash-based Map wrapper.
+ * java.util.Map methods that mutate in-place will throw 
UnsupportedOperationException
+ *
+ * @param  the key type
+ * @param  the value type
+ */
+public interface ImmutableMap extends Map {
+/**
+ * @return the underlying persistent map
+ */
+Object underlying();

Review Comment:
   Yeah, I think it would be good to avoid this unless there is a concrete 
reason we need it...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162128357


##
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
 }
 
 public void write(ImageWriter writer, ImageWriterOptions options) {
-for (TopicImage topicImage : topicsById.values()) {
-topicImage.write(writer, options);
+for (Map.Entry entry : topicsById.entrySet()) {
+entry.getValue().write(writer, options);

Review Comment:
   The `java.util.Collection` returned by `java.util.Map#values` is pretty 
basic. I can't find any comment about how its equals method is supposed to 
work. For `HashMap#values`, it seems to just be doing reference equality. (In 
contrast, `Map#keySet` returns an actual Set which is compared how you would 
expect.)
   
   @rondagostino, like you, I wrote a test program with 
`java.util.HashMap` and `java.util.HashSet` and got these 
very similar results
   ```
   foo = {a->a, b->b)
   bar = {a, b}
   foo.values().equals(foo.values()) = true
   new HashSet<>(foo.values()).equals(bar) = true
   foo.values().equals(bar) = false
   bar.equals(foo.values()) = false
   foo.keySet().equals(bar) = true
   bar.equals(foo.keySet()) = true
   ```
   
   > We could, but it is marked deprecated in the library because there is no 
way to provide a reasonable .equals() method. I actually checked, and indeed it 
is true:
   
   What version of the pcollections source code were you looking at? I 
downloaded the source from https://github.com/hrldcpr/pcollections and wasn't 
able to find any comment or deprecated indicator for `HashPMap#values()`. In 
fact, it looks like it simply inherits the `AbstractMap#values` implementation 
without any changes. I suspect that this will actually implement reference 
equality, since this implementation saves the Collection object it creates in a 
`transient` field (ew)
   
   But even leaving that aside, I can't find any API guarantees about what the 
equals method of the collection returned by `Map#values` is supposed to do. 
It's possible that this is just undefined. At any rate the existing behavior of 
the java.util.Map subclasses is certainly useless here (it will not be what 
anyone expects)
   
   `Collection#equals` says you can do whatever you want for `equals`, but you 
should "exercise care". Zooming out a bit, the big picture is that interfaces 
like List or Set define a reasonable behavior for equals, whereas Collection 
(which is a parent interface for both) is just like :shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162128357


##
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##
@@ -76,8 +84,8 @@ public TopicImage getTopic(String name) {
 }
 
 public void write(ImageWriter writer, ImageWriterOptions options) {
-for (TopicImage topicImage : topicsById.values()) {
-topicImage.write(writer, options);
+for (Map.Entry entry : topicsById.entrySet()) {
+entry.getValue().write(writer, options);

Review Comment:
   The `java.util.Collection` returned by `java.util.Map#values` is pretty 
basic. I can't find any comment about how its equals method is supposed to 
work. For `HashMap#values`, it seems to just be doing reference equality. (In 
contrast, `Map#keySet` returns an actual Set which is compared how you would 
expect.)
   
   @rondagostino, like you, I wrote a test program with 
`java.util.HashMap` and `java.util.HashSet` and got these 
very similar results
   ```
   foo = {a->a, b->b)
   bar = {a, b}
   foo.values().equals(foo.values()) = true
   new HashSet<>(foo.values()).equals(bar) = true
   foo.values().equals(bar) = false
   bar.equals(foo.values()) = false
   foo.keySet().equals(bar) = true
   bar.equals(foo.keySet()) = true
   ```
   
   > We could, but it is marked deprecated in the library because there is no 
way to provide a reasonable .equals() method. I actually checked, and indeed it 
is true:
   
   What version of the pcollections source code were you looking at? I 
downloaded the source from https://github.com/hrldcpr/pcollections and wasn't 
able to find any comment or deprecated indicator for `HashPMap#values()`. In 
fact, it looks like it simply inherits the `AbstractMap#values` implementation 
without any changes. I suspect that this will actually implement reference 
equality, since this implementation saves the Collection object it creates in a 
`transient` field (ew)
   
   But even leaving that aside, I can't find any API guarantees about what the 
equals method of the collection returned by `Map#values` is supposed to do. 
It's possible that this is just undefined. At any rate the existing behavior of 
the java.util.Map subclasses is certainly useless here (it will not be what 
anyone expects)
   
   `Collection#equals` says you can do whatever you want for `equals`, but you 
should "exercise care". Zooming out a bit, the big picture is that interfaces 
like List or Set define a reasonable behavior for equals, whereas Collection 
(which is the base class for both) is just like #:shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14054) Unexpected client shutdown as TimeoutException is thrown as IllegalStateException

2023-04-10 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14054:
---

Assignee: Matthias J. Sax

> Unexpected client shutdown as TimeoutException is thrown as 
> IllegalStateException
> -
>
> Key: KAFKA-14054
> URL: https://issues.apache.org/jira/browse/KAFKA-14054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.2.0, 3.1.1
>Reporter: Donald
>Assignee: Matthias J. Sax
>Priority: Major
>
>  Re: 
> https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2
> 1) TimeoutException is thrown as IllegalStateException in 
> {_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. 
> Which causes the client to shutdown in 
> {_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}.
> 2) Should Timeout be a recoverable error which is expected to be handled by 
> User?
> 3) This issue is exposed by change KAFKA-12887 which was introduced in 
> kafka-streams ver 3.1.0
> *code referenced*
> {code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded}
> public boolean commitNeeded() {
> if (commitNeeded) {
> return true;
> } else {
> for (final Map.Entry entry : 
> consumedOffsets.entrySet()) {
> final TopicPartition partition = entry.getKey();
> try {
> final long offset = mainConsumer.position(partition);
> if (offset > entry.getValue() + 1) {
> commitNeeded = true;
> entry.setValue(offset - 1);
> }
> } catch (final TimeoutException error) {
> // the `consumer.position()` call should never block, 
> because we know that we did process data
> // for the requested partition and thus the consumer 
> should have a valid local position
> // that it can return immediately
> // hence, a `TimeoutException` indicates a bug and thus 
> we rethrow it as fatal `IllegalStateException`
> throw new IllegalStateException(error);
> } catch (final KafkaException fatal) {
> throw new StreamsException(fatal);
> }
> }
> return commitNeeded;
> }
> }
> {code}
> {code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable}
> private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
> getActionForThrowable(final Throwable throwable,
>   
>   final StreamsUncaughtExceptionHandler 
> streamsUncaughtExceptionHandler) {
> final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
> action;
> if (wrappedExceptionIsIn(throwable, 
> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
> action = SHUTDOWN_CLIENT;
> } else {
> action = streamsUncaughtExceptionHandler.handle(throwable);
> }
> return action;
> }
> private void handleStreamsUncaughtException(final Throwable throwable,
> final 
> StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
> final boolean 
> skipThreadReplacement) {
> final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
> action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
> if (oldHandler) {
> log.warn("Stream's new uncaught exception handler is set as well 
> as the deprecated old handler." +
> "The old handler will be ignored as long as a new handler 
> is set.");
> }
> switch (action) {
> case REPLACE_THREAD:
> if (!skipThreadReplacement) {
> log.error("Replacing thread in the streams uncaught 
> exception handler", throwable);
> replaceStreamThread(throwable);
> } else {
> log.debug("Skipping thread replacement for recoverable 
> error");
> }
> break;
> case SHUTDOWN_CLIENT:
> log.error("Encountered the following exception during 
> processing " +
> "and Kafka Streams opted to " + action + "." +
> " The streams client is going to shut down now. ", 
> throwable);
> closeToError();
> break;
> {code}
>  *Stacktrace*
> {code:java|title=error log kafka-

[GitHub] [kafka] mjsax commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-10 Thread via GitHub


mjsax commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1502392008

   @cadonna -- Should we cherry-pick to 3.4 branch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-04-10 Thread via GitHub


mjsax commented on PR #10747:
URL: https://github.com/apache/kafka/pull/10747#issuecomment-1502379032

   Follow up PR to user varint encoding: 
https://github.com/apache/kafka/pull/13533


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax opened a new pull request, #13533: KAFKA-12446: update change encoding to use varint

2023-04-10 Thread via GitHub


mjsax opened a new pull request, #13533:
URL: https://github.com/apache/kafka/pull/13533

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic

2023-04-10 Thread via GitHub


jeffkbkim commented on code in PR #13476:
URL: https://github.com/apache/kafka/pull/13476#discussion_r1161913599


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
+import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment;
+import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
+import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
+import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
+import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord;
+import static 
org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
+
+/**
+ * The GroupMetadataManager manages the metadata of all generic and consumer 
groups. It holds
+ * the hard and the soft state of the groups. This class has two kinds of 
methods:

Re

[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

2023-04-10 Thread via GitHub


guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1162110812


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -988,6 +1005,14 @@ private void prepareChangelogs(final Map 
tasks,
 } catch (final Exception e) {
 throw new StreamsException("State restore listener failed 
on batch restored", e);
 }
+
+final TaskId taskId = changelogMetadata.stateManager.taskId();
+final Task task = tasks.get(taskId);
+// if the log is truncated between when we get the log end 
offset and when we get the
+// consumer position, then it's possible that the difference 
become negative and there's actually
+// no records to restore; in this case we just initialize the 
sensor to zero

Review Comment:
   I was thinking about the case where the consumer's position is set as the 
checkpointed offset, whereas the log end offset was truncated after the 
checkpoint was written. Maybe I did not make that clear in the above comment? 
Will reword a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162109668


##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -354,6 +361,7 @@ class RequestChannel(val queueSize: Int,
   private val processors = new ConcurrentHashMap[Int, Processor]()
   val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
   val responseQueueSizeMetricName = 
metricNamePrefix.concat(ResponseQueueSizeMetric)
+  private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)

Review Comment:
   Let's do in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

2023-04-10 Thread via GitHub


guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1162107892


##
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##
@@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final 
boolean stateUpdaterEnab
 kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
 kafkaStreams.pause();
 kafkaStreams.start();
-waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+waitForApplicationState(singletonList(kafkaStreams), 
State.REBALANCING, STARTUP_TIMEOUT);
 assertTrue(kafkaStreams.isPaused());
 
 produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
 
-waitUntilStreamsHasPolled(kafkaStreams, 2);

Review Comment:
   Sorry my bad, it should still be there, will add back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13280:
URL: https://github.com/apache/kafka/pull/13280#discussion_r1162107641


##
metadata/src/main/java/org/apache/kafka/image/TopicsImage.java:
##
@@ -21,41 +21,48 @@
 import org.apache.kafka.image.writer.ImageWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.immutable.ImmutableMap;
+import org.apache.kafka.server.immutable.ImmutableMapSetFactory;
 import org.apache.kafka.server.util.TranslatedValueMapView;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-
 /**
  * Represents the topics in the metadata image.
  *
  * This class is thread-safe.
  */
 public final class TopicsImage {
-public static final TopicsImage EMPTY =
-new TopicsImage(Collections.emptyMap(), Collections.emptyMap());
+private static final ImmutableMapSetFactory FACTORY = 
ImmutableMapSetFactory.PCOLLECTIONS_FACTORY;
+
+public static final TopicsImage EMPTY =  new 
TopicsImage(FACTORY.emptyMap(), FACTORY.emptyMap());
 
-private final Map topicsById;
-private final Map topicsByName;
+final ImmutableMap topicsById;

Review Comment:
   I realize that this is immutable and very unlikely to ever be computed on 
the fly, but I would still really like to avoid direct field access. If we 
start doing stuff like this, people will tend to copy the pattern where it can 
cause harm (for example, where the map is mutable, or where we might want to 
compute the value on the fly). And there is no benefit here, so let's avoid it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader

2023-04-10 Thread via GitHub


guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1162102863


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -880,6 +896,9 @@ private void initializeChangelogs(final Map 
tasks,
 }
 
 private void addChangelogsToRestoreConsumer(final Set 
partitions) {
+if (partitions.isEmpty())

Review Comment:
   Yes that's right, but as long as `partitions.isEmpty()` then we could 
actually skip the whole func, so I made it applied at larger scope.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13513:
URL: https://github.com/apache/kafka/pull/13513#discussion_r1162102531


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -458,8 +458,9 @@ public void run() throws Exception {
 zkMigrationClient.readAllMetadata(batch -> {
 try {
 if (log.isTraceEnabled()) {
-log.trace("Migrating {} records from ZK: {}", 
batch.size(), recordBatchToString(batch));
+log.info("Migrating {} records from ZK: {}", 
batch.size(), recordBatchToString(batch));

Review Comment:
   It looks like you changed this for the purpose of debugging and forgot to 
change it back. BTW you can just change the test log4j.properties file to avoid 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-10 Thread via GitHub


cmccabe commented on code in PR #13513:
URL: https://github.com/apache/kafka/pull/13513#discussion_r1162102172


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -458,8 +458,9 @@ public void run() throws Exception {
 zkMigrationClient.readAllMetadata(batch -> {
 try {
 if (log.isTraceEnabled()) {
-log.trace("Migrating {} records from ZK: {}", 
batch.size(), recordBatchToString(batch));
+log.info("Migrating {} records from ZK: {}", 
batch.size(), recordBatchToString(batch));
 } else {
+log.info("Migrating {} records from ZK: {}", 
batch.size(), recordBatchToString(batch));

Review Comment:
   This looks like a typo. Why is this the same as in the trace-enabled case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration

2023-04-10 Thread via GitHub


cmccabe commented on PR #13513:
URL: https://github.com/apache/kafka/pull/13513#issuecomment-1502329503

   > Should I move ScramCredentialData.java from 
metadata/src/main/java/org/apache/kafka/image to 
metadata/src/main/java/org/apache/kafka/metadata ?
   
   yes, please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino opened a new pull request, #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing

2023-04-10 Thread via GitHub


rondagostino opened a new pull request, #13532:
URL: https://github.com/apache/kafka/pull/13532

   `FinalizedFeatureChangeListener` shuts the broker down when it encounters an 
issue trying to process feature change events, but it does not distinguish 
between issues related to feature changes actually failing and other exceptions 
like ZooKeeper session expiration.  This introduces the possibility that 
Zookeeper session expiration could cause the broker to shutdown, which is not 
intended.  This patch updates the code to distinguish between these two types 
of exceptions and in the case of something like a ZK session expiration it logs 
a warning and continues.  We shutdown the broker only for 
`FeatureCacheUpdateException`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14887) ZK session timeout can cause broker to shutdown

2023-04-10 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-14887:
-

 Summary: ZK session timeout can cause broker to shutdown
 Key: KAFKA-14887
 URL: https://issues.apache.org/jira/browse/KAFKA-14887
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 
3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 2.7.2, 3.1.0, 2.7.1, 2.8.0, 
2.7.0
Reporter: Ron Dagostino


We have the following code in FinalizedFeatureChangeListener.scala which will 
exit regardless of the type of exception that is thrown when trying to process 
feature changes:

case e: Exception => {
  error("Failed to process feature ZK node change event. The broker 
will eventually exit.", e)
  throw new FatalExitError(1)

The issue here is that this does not distinguish between exceptions caused by 
an inability to process a feature change and an exception caused by a ZooKeeper 
session timeout.  We want to shut the broker down for the former case, but we 
do NOT want to shut the broker down in the latter case; the ZooKeeper session 
will eventually be reestablished, and we can continue processing at that time.





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-04-10 Thread via GitHub


mjsax commented on PR #10747:
URL: https://github.com/apache/kafka/pull/10747#issuecomment-1502280780

   Thanks for the quick turn around time! Merged. We can still do a follow up 
on the open question with regard to the serialization format. Using 
`Bytes.putInt()` should actually do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-04-10 Thread via GitHub


mjsax merged PR #10747:
URL: https://github.com/apache/kafka/pull/10747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-04-10 Thread via GitHub


mjsax commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1162058460


##
docs/streams/upgrade-guide.html:
##
@@ -75,6 +75,20 @@ <
 changelogs.
 
 
+
+Downgrading from 3.5.x or newer version to 3.4.x or older version 
needs special attention:
+Since 3.5.0 release, Kafka Streams uses a new serialization format for 
repartition topics.
+This means that older versions of Kafka Streams would not be able to 
recognize the bytes written by newer versions,
+and hence it is harder to downgrade Kafka Streams with version 3.5.0 
or newer to older versions in-flight. For
+more details, please refer to https://cwiki.apache.org/confluence/x/P5VbDg";>KIP-904.
+
+For a downgrade, first switch the config from 
"upgrade.from" to the version you are downgrading to.
+This disables writing of the new serialization format in your 
application. It's important to wait in this state
+log enough to make sure that the application has finished processing 
any "in-flight" messages written

Review Comment:
   ```suggestion
   long enough to make sure that the application has finished 
processing any "in-flight" messages written
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-04-10 Thread via GitHub


mjsax commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1162056271


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java:
##
@@ -76,6 +79,48 @@ public boolean enableSendingOldValues(final boolean 
forceMaterialization) {
 
 private class KTableMapProcessor extends ContextualProcessor, 
K1, Change> {
 
+private boolean isNotUpgrade;
+
+@SuppressWarnings("checkstyle:cyclomaticComplexity")
+private boolean isNotUpgrade(final Map configs) {

Review Comment:
   Maybe the inner class would need to be `static`, too? Not sure. Also not too 
important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same

2023-04-10 Thread via GitHub


mjsax commented on code in PR #10747:
URL: https://github.com/apache/kafka/pull/10747#discussion_r1162055509


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java:
##
@@ -45,34 +49,87 @@ public void setIfUnset(final SerdeGetter getter) {
 }
 }
 
+@SuppressWarnings("checkstyle:cyclomaticComplexity")
+private boolean isUpgrade(final Map configs) {
+final Object upgradeFrom = 
configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+if (upgradeFrom == null) {
+return false;
+}
+
+switch ((String) upgradeFrom) {
+case StreamsConfig.UPGRADE_FROM_0100:
+case StreamsConfig.UPGRADE_FROM_0101:
+case StreamsConfig.UPGRADE_FROM_0102:
+case StreamsConfig.UPGRADE_FROM_0110:
+case StreamsConfig.UPGRADE_FROM_10:
+case StreamsConfig.UPGRADE_FROM_11:
+case StreamsConfig.UPGRADE_FROM_20:
+case StreamsConfig.UPGRADE_FROM_21:
+case StreamsConfig.UPGRADE_FROM_22:
+case StreamsConfig.UPGRADE_FROM_23:
+case StreamsConfig.UPGRADE_FROM_24:
+case StreamsConfig.UPGRADE_FROM_25:
+case StreamsConfig.UPGRADE_FROM_26:
+case StreamsConfig.UPGRADE_FROM_27:
+case StreamsConfig.UPGRADE_FROM_28:
+case StreamsConfig.UPGRADE_FROM_30:
+case StreamsConfig.UPGRADE_FROM_31:
+case StreamsConfig.UPGRADE_FROM_32:
+case StreamsConfig.UPGRADE_FROM_33:
+case StreamsConfig.UPGRADE_FROM_34:
+return true;
+default:
+return false;
+}
+}
+
+@Override
+public void configure(final Map configs, final boolean isKey) {
+this.isUpgrade = isUpgrade(configs);
+}
+
 /**
  * @throws StreamsException if both old and new values of data are null, 
or if
- * both values are not null
+ * both values are not null and is upgrading from a version less than 3.4
  */
 @Override
 public byte[] serialize(final String topic, final Headers headers, final 
Change data) {
-final byte[] serializedKey;
-
-// only one of the old / new values would be not null
-if (data.newValue != null) {
-if (data.oldValue != null) {
+final boolean oldValueIsNull = data.oldValue == null;
+final boolean newValueIsNull = data.newValue == null;
+
+final byte[] newData = inner.serialize(topic, headers, data.newValue);
+final byte[] oldData = inner.serialize(topic, headers, data.oldValue);
+
+final int newDataLength = newValueIsNull ? 0 : newData.length;
+final int oldDataLength = oldValueIsNull ? 0 : oldData.length;
+
+// The serialization format is:
+// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
+// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
+// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY 
oldValue}{BYTE newOldFlag=2}
+final ByteBuffer buf;
+if (!newValueIsNull && !oldValueIsNull) {
+if (isUpgrade) {
 throw new StreamsException("Both old and new values are not 
null (" + data.oldValue
-+ " : " + data.newValue + ") in ChangeSerializer, which is 
not allowed.");
++ " : " + data.newValue + ") in ChangeSerializer, 
which is not allowed unless upgrading.");
+} else {
+final int capacity = UINT32_SIZE + newDataLength + 
oldDataLength + NEW_OLD_FLAG_SIZE;
+buf = ByteBuffer.allocate(capacity);
+ByteUtils.writeUnsignedInt(buf, newDataLength);

Review Comment:
   I would actually recommend to just remove it from the KIP -- it seem to be 
an implementation detail. \cc @guozhangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162049144


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+nodesToTransactions.synchronized {
+  // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+new TransactionDataAndCallbacks(
+  new AddPartitionsToTxnTransactionCollection(1),
+  mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+  // reconnected so return the retriable network exception.
+  if (currentTransactionData != null) {
+val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else 
+  Errors.NETWORK_EXCEPTION
+val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
+currentTransactionData.topics().forEach { topic =>
+  topic.partitions().forEach { partition =>
+topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
+  }
+}
+val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
+currentNodeAndTransactionData.transactionData.remove(transactionData)
+oldCallback(topicPartitionsToError.toMap)
+  }
+  currentNodeAndTransactionData.transactionData.add(transactionData)
+  
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+  wakeup()
+}
+  }
+
+  private class AddPartitionsToTxnHandler(node: Node, 
transactionDataAndCallbacks: TransactionDataAndCallbacks) extends 
RequestCompletionHandler {
+override def onComplete(response: ClientResponse): Unit = {
+  // Note: Synchronization is not needed on inflightNodes since it is 
always accessed from this thread.
+  inflightNodes.remove(node)
+  if (response.authenticationException() != null) {
+error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} 
with an " +
+  "authentication exception.", response.authenticationException)
+transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) 
=>
+  callback(buildErrorMap(txnId, 
transactionDataAndCallbacks.transactionData, 
Errors.forException

[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162047588


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
   completeShutdown()
   return
 
+case callback: RequestChannel.CallbackRequest =>
+  try {
+val originalRequest = callback.originalRequest
+
+// If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+// new dequeue time. This will allow calculation of multiple 
callback times.
+// Otherwise, set dequeue time to now.
+if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
+  val prevCallbacksTimeNanos = 
originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - 
originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
+  originalRequest.callbackRequestCompleteTimeNanos = None
+  originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds() - prevCallbacksTimeNanos)
+} else {
+  originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds())
+}
+
+currentRequest.set(originalRequest)
+callback.fun()
+if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)
+  originalRequest.callbackRequestCompleteTimeNanos = 
Some(time.nanoseconds())
+  } catch {
+case e: FatalExitError =>
+  completeShutdown()
+  Exit.exit(e.statusCode)
+case e: Throwable => error("Exception when handling request", e)
+  } finally {
+currentRequest.remove()
+  }
+
 case request: RequestChannel.Request =>
   try {
 request.requestDequeueTimeNanos = endTime
 trace(s"Kafka request handler $id on broker $brokerId handling 
request $request")
+currentRequest.set(request)
 apis.handle(request, requestLocal)
   } catch {
 case e: FatalExitError =>
   completeShutdown()
   Exit.exit(e.statusCode)
 case e: Throwable => error("Exception when handling request", e)
   } finally {
+currentRequest.remove()
 request.releaseBuffer()
   }
 
+case RequestChannel.WakeupRequest => // We should handle this in 
receiveRequest by polling callbackQueue.

Review Comment:
   We can add a warning log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162047387


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
   completeShutdown()
   return
 
+case callback: RequestChannel.CallbackRequest =>
+  try {
+val originalRequest = callback.originalRequest
+
+// If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+// new dequeue time. This will allow calculation of multiple 
callback times.
+// Otherwise, set dequeue time to now.
+if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) {
+  val prevCallbacksTimeNanos = 
originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - 
originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L)
+  originalRequest.callbackRequestCompleteTimeNanos = None
+  originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds() - prevCallbacksTimeNanos)
+} else {
+  originalRequest.callbackRequestDequeueTimeNanos = 
Some(time.nanoseconds())
+}
+
+currentRequest.set(originalRequest)
+callback.fun()
+if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty)

Review Comment:
   It is not true if we returned a response. We also update the value there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162046736


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int,
   completeShutdown()
   return
 
+case callback: RequestChannel.CallbackRequest =>
+  try {
+val originalRequest = callback.originalRequest
+
+// If we've already executed a callback for this request, reset 
the times and subtract the callback time from the 
+// new dequeue time. This will allow calculation of multiple 
callback times.

Review Comment:
   Artem requested this. See comment here. 
https://github.com/apache/kafka/pull/13391#discussion_r1160849589
   
   There is currently not a way to prevent infinite callbacks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162045551


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -35,6 +36,43 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
 }
 
+object KafkaRequestHandler {
+  // Support for scheduling callbacks on a request thread.
+  private val threadRequestChannel = new ThreadLocal[RequestChannel]
+  private val currentRequest = new ThreadLocal[RequestChannel.Request]
+
+  // For testing
+  private var bypassThreadCheck = false

Review Comment:
   We can make it volatile, but this is only really used in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162045206


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+nodesToTransactions.synchronized {
+  // Check if we have already (either node or individual transaction). Add 
the Node if it isn't there.
+  val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+new TransactionDataAndCallbacks(
+  new AddPartitionsToTxnTransactionCollection(1),
+  mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
+
+  val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+
+  // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
+  // reconnected so return the retriable network exception.
+  if (currentTransactionData != null) {
+val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+  Errors.INVALID_PRODUCER_EPOCH
+else 
+  Errors.NETWORK_EXCEPTION

Review Comment:
   ^ ditto comment about retriable errors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162044614


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -616,66 +619,128 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
 delayedProduceLock: Option[Lock] = None,
 recordConversionStatsCallback: Map[TopicPartition, 
RecordConversionStats] => Unit = _ => (),
-requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
+requestLocal: RequestLocal = RequestLocal.NoCaching,
+transactionalId: String = null,
+transactionStatePartition: Option[Int] = None): Unit = {
 if (isValidRequiredAcks(requiredAcks)) {
   val sTime = time.milliseconds
-  val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
-origin, entriesPerPartition, requiredAcks, requestLocal)
-  debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-
-  val produceStatus = localProduceResults.map { case (topicPartition, 
result) =>
-topicPartition -> ProducePartitionStatus(
-  result.info.lastOffset + 1, // required offset
-  new PartitionResponse(
-result.error,
-result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
-result.info.logAppendTime,
-result.info.logStartOffset,
-result.info.recordErrors,
-result.info.errorMessage
+  
+  val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
+if (transactionStatePartition.isEmpty || 
!config.transactionPartitionVerificationEnable)
+  (entriesPerPartition, Map.empty)
+else
+  entriesPerPartition.partition { case (topicPartition, records) =>
+
getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId())
+  }
+
+  def appendEntries(allEntries: Map[TopicPartition, 
MemoryRecords])(unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
+val verifiedEntries = 
+  if (unverifiedEntries.isEmpty) 
+allEntries 
+  else
+allEntries.filter { case (tp, _) =>
+  !unverifiedEntries.contains(tp)
+}
+
+val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
+  origin, verifiedEntries, requiredAcks, requestLocal)
+debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
+
+val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
+  // NOTE: Older clients return INVALID_RECORD, but newer clients will 
return INVALID_TXN_STATE
+  val message = if (error.equals(Errors.INVALID_RECORD)) "Partition 
was not added to the transaction" else error.message()
+  topicPartition -> LogAppendResult(
+LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+Some(error.exception(message))
   )
-) // response status
-  }
+}
+
+val allResults = localProduceResults ++ unverifiedResults
+
+val produceStatus = allResults.map { case (topicPartition, result) =>
+  topicPartition -> ProducePartitionStatus(
+result.info.lastOffset + 1, // required offset
+new PartitionResponse(
+  result.error,
+  result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+  result.info.logAppendTime,
+  result.info.logStartOffset,
+  result.info.recordErrors,
+  result.info.errorMessage
+)
+  ) // response status
+}
 
-  actionQueue.add {
-() =>
-  localProduceResults.foreach {
-case (topicPartition, result) =>
-  val requestKey = TopicPartitionOperationKey(topicPartition)
-  result.info.leaderHwChange match {
-case LeaderHwChange.INCREASED =>
-  // some delayed operations may be unblocked after HW changed
-  delayedProducePurgatory.checkAndComplete(requestKey)
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-  delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.SAME =>
-  // probably unblock some follower fetch requests since log 
end offset has been updated
-  delayedFetchPurgatory.checkAndComplete(requestKey)
-case LeaderHwChange.NONE =>
+actionQueue.add {
+  () =>
+allResults.foreach {
+  case (topicPartition, result) =>
+val requestKey = TopicPartitionOperationKey(topicPartition)
+result.info.leaderHwChang

[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1162043968


##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -29,7 +29,7 @@
   // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 
and is still
   // under developement. Hence, the API is not exposed by default by brokers
   // unless explicitely enabled.
-  "latestVersionUnstable": true,
+  "latestVersionUnstable": false,

Review Comment:
   I will update the comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


junrao commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161909242


##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestCompletionHandler}
+import org.apache.kafka.common.{Node, TopicPartition}
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction,
 AddPartitionsToTxnTransactionCollection}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, 
AddPartitionsToTxnResponse}
+import org.apache.kafka.common.utils.Time
+
+import scala.collection.mutable
+
+object AddPartitionsToTxnManager {
+  type AppendCallback = Map[TopicPartition, Errors] => Unit
+}
+
+
+class TransactionDataAndCallbacks(val transactionData: 
AddPartitionsToTxnTransactionCollection,
+  val callbacks: mutable.Map[String, 
AddPartitionsToTxnManager.AppendCallback])
+
+
+class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, 
time: Time) 
+  extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + 
config.brokerId, client, config.requestTimeoutMs, time) {
+  
+  private val inflightNodes = mutable.HashSet[Node]()
+  private val nodesToTransactions = mutable.Map[Node, 
TransactionDataAndCallbacks]()
+  
+  def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
+nodesToTransactions.synchronized {

Review Comment:
   Our long term goal is to replace the scala code with java. Could we write 
this new class and the corresponding test in java?



##
clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json:
##
@@ -29,7 +29,7 @@
   // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 
and is still
   // under developement. Hence, the API is not exposed by default by brokers
   // unless explicitely enabled.
-  "latestVersionUnstable": true,
+  "latestVersionUnstable": false,

Review Comment:
   The above comment still says "is still under developement". Is the latest 
version indeed stable? Or should we change the comment accordingly?



##
core/src/main/scala/kafka/network/RequestChannel.scala:
##
@@ -354,6 +361,7 @@ class RequestChannel(val queueSize: Int,
   private val processors = new ConcurrentHashMap[Int, Processor]()
   val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
   val responseQueueSizeMetricName = 
metricNamePrefix.concat(ResponseQueueSizeMetric)
+  private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)

Review Comment:
   This seems to be a more general mechanism than ActionQueue. Could we move 
all existing ActionQueue usage to callback queue and get rid of ActionQueue? 
This could be done in a separate PR.



##
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, 
RequestComp

[GitHub] [kafka] sql888 commented on pull request #12006: KAFKA-13794: Follow up to fix comparator

2023-04-10 Thread via GitHub


sql888 commented on PR #12006:
URL: https://github.com/apache/kafka/pull/12006#issuecomment-1502205546

   > Failed tests are unrelated
   > 
   > ```
   > Build / ARM / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   > ```
   
   
   How to fix this failed test?   getting this in M1 ARM Macbook.  Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] SpacRocket commented on pull request #13382: KAFKA-14722: Make BooleanSerde public

2023-04-10 Thread via GitHub


SpacRocket commented on PR #13382:
URL: https://github.com/apache/kafka/pull/13382#issuecomment-1502183087

   @mjsax Sure thanks for the suggestion. I've pushed a new commit in this pull 
request:
   https://github.com/apache/kafka/pull/13491


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-04-10 Thread Sergei Morozov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710259#comment-17710259
 ] 

Sergei Morozov commented on KAFKA-14750:


[~sagarrao] thank you for digging into this and providing the details. I would 
agree that a solution that will cause an overhead is not the best for this 
corner case. I'm not well familiar with the Kafka internals, so I don't 
understand why this issue is reproducible only during a mass topic deletion 
(which makes it a corner case). As a general idea, if a client fails to 
retrieve the position, and we know that at this point the topic may no longer 
exist, could we just refresh the cache on the client side via the admin client 
and only fail if the topic still exists?

> Sink connector fails if a topic matching its topics.regex gets deleted
> --
>
> Key: KAFKA-14750
> URL: https://issues.apache.org/jira/browse/KAFKA-14750
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.1
>Reporter: Sergei Morozov
>Assignee: Sagar Rao
>Priority: Major
>
> Steps to reproduce:
> # In {{{}config/connect-standalone.properties{}}}, set:
> {code:bash}
> plugin.path=libs/connect-file-3.3.1.jar
> {code}
> # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line 
> and add this one:
> {code:bash}
> topics.regex=connect-test-.*
> {code}
> # Start zookeeper:
> {code:bash}
> bin/zookeeper-server-start.sh config/zookeeper.properties
> {code}
> # Start the brokers:
> {code:bash}
> bin/kafka-server-start.sh config/server.properties
> {code}
> # Start the file sink connector:
> {code:bash}
> bin/connect-standalone.sh config/connect-standalone.properties 
> config/connect-file-sink.properties
> {code}
> # Create topics for the sink connector to subscribe to:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
> bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --create \
>         --topic connect-test-$j
>   done &
> done
> wait
> {code}
> # Wait until all the created topics are assigned to the connector. Check the 
> number of partitions to be > 0 in the output of:
> {code:bash}
> bin/kafka-consumer-groups.sh \
>     --bootstrap-server localhost:9092 \
>     --group connect-local-file-sink \
>     --describe --members
> {code}
> # Delete the created topics:
> {code:bash}
> for i in {0..2}; do
>   for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do
>     bin/kafka-topics.sh \
>         --bootstrap-server localhost:9092 \
>         --delete \
>         --topic connect-test-$j
>     echo Deleted topic connect-test-$j.
>   done &
> done
> wait
> {code}
> # Observe the connector fail with the following error:
> {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
> expired before the position for partition connect-test-211-0 could be 
> determined
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161946467


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##


Review Comment:
   Added a line to the ReplicaManager test to see that we return early on the 
error in the callback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-10 Thread via GitHub


guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1161940023


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Hmm.. that was not expected, did you check the trace and find out why the 
`onAssignment` was not triggered, in which it would set `rebalanceInProgress = 
true`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-04-10 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1161929533


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath(
 builder.useParallelExecutor();
 Reflections reflections = new InternalReflections(builder);
 
-return new PluginScanResult(
-getPluginDesc(reflections, SinkConnector.class, loader),
-getPluginDesc(reflections, SourceConnector.class, loader),
-getPluginDesc(reflections, Converter.class, loader),
-getPluginDesc(reflections, HeaderConverter.class, loader),
-getTransformationPluginDesc(loader, reflections),
-getPredicatePluginDesc(loader, reflections),
-getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-);
+ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   > I don't see a strong reason why it's not [static].
   
   It's non-static to make mocking easier. Rather than having to mock a static 
method of a class, you mock the Plugins instance, and stub out the loader 
swapping functionality.
   
   It appears that there are only a handful of places where 
compareAndSwapLoaders (and compareAndSwapWithDelegatingLoader) is used:
   * In DelegatingClassLoader, during initialization
   * In AbstractConnectCli and MirrorMaker to swap to the delegating classloader
   * In EmbeddedConnectCluster to swap back to the saved loader (KAFKA-12229)
   
   I think that the EmbeddedConnectCluster call-site is just a result of the 
open-ended delegating swaps. I'll refactor all of these call-sites to use 
LoaderSwap, and hide the more dangerous compareAndSwapLoaders now that only 
LoaderSwap is using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13524: KIP-848-Interface changes

2023-04-10 Thread via GitHub


jeffkbkim commented on code in PR #13524:
URL: https://github.com/apache/kafka/pull/13524#discussion_r1161928058


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdToPartition.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.common;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+public class TopicIdToPartition {
+private final Uuid topicId;
+private final Integer partition;
+private final Optional> rackIds;
+
+public TopicIdToPartition(Uuid topicId, Integer topicPartition, 
Optional> rackIds) {
+this.topicId = Objects.requireNonNull(topicId, "topicId can not be 
null");
+this.partition = Objects.requireNonNull(topicPartition, 
"topicPartition can not be null");
+this.rackIds = rackIds;

Review Comment:
   this should also be not null. If rackIds is empty, we should pass in 
Optional.empty()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161896585


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##


Review Comment:
   We do have tests from the previous PR that return errors if the partition is 
not added to the txn. See 
https://github.com/apache/kafka/commit/29a1a16668d76a1cc04ec9e39ea13026f2dce1de



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161874495


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -572,6 +572,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 result
   }
 
+  def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized {
+val entry = producerStateManager.activeProducers.get(producerId)
+entry != null && entry.currentTxnFirstOffset.isPresent

Review Comment:
   This is a Java map, so that doesn't work. I can convert to scala, but not 
sure that is much better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown

2023-04-10 Thread Haoze Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haoze Wu updated KAFKA-14886:
-
Description: 
In Kafka-2.8.0, we found that the number of data plane Kafka request handlers 
may quickly reach the limit when only one request is stuck. As a result, all 
other requests that require a data plane request handler will be stuck.

When there is a slowdown inside the storeOffsets function at line 777 due to 
I/O operation, the thread holds the lock acquired at line 754.
{code:java}
  private def doCommitOffsets(group: GroupMetadata,
                              memberId: String,
                              groupInstanceId: Option[String],
                              generationId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
    group.inLock { // Line 754
..
      groupManager.storeOffsets() // Line 777
..
  }
} {code}
Its call stack is:
{code:java}
kafka.coordinator.group.GroupMetadata,inLock,227
kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755
kafka.server.KafkaApis,handleOffsetCommitRequest,515
kafka.server.KafkaApis,handle,175
kafka.server.KafkaRequestHandler,run,74
java.lang.Thread,run,748 {code}
This happens when the broker is handling the commit offset request from the 
consumer. When the slowdown mentioned above makes consumers get no response 
back, the consumer will automatically resend the request to the broker. Note 
that each request from the consumer is handled by a 
data-plane-kafka-request-handler thread. Therefore, another 
data-plane-kafka-request-handler thread will be also stuck at line 754 when 
handling the retry requests, because it tries to acquire the very same lock of 
the consumer group. The retry will occur repeatedly, and none of them can 
succeed. As a result, the pool of data-plane-kafka-request-handler threads will 
be full. Note that this pool of threads is responsible for handling all such 
requests from all producers and consumers. As a result, all the producers and 
consumers would be affected.

However, the backoff mechanism might be able to solve this issue, by reducing 
the number of requests in a short time and reserving more slots in the thread 
pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if 
the issue disappears. Specifically, we increase the retry backoff from 100ms 
(default) to 1000ms in consumer’s config. However, we found that the mentioned 
thread pool is full again, because there are multiple heartbeat requests that 
take up the slots of this thread pool. All those heartbeat request handling is 
stuck when they are acquiring the same consumer group lock, which has been 
acquired at line 754 as mentioned. Specifically, the heartbeat handling is 
stuck at GroupCoordinator.handleHeartbeat@624:
{code:java}
  def handleHeartbeat(groupId: String,
                      memberId: String,
                      groupInstanceId: Option[String],
                      generationId: Int,
                      responseCallback: Errors => Unit): Unit = {
..
      case Some(group) => group.inLock { // Line 624
..
      }
..
} {code}
The heartbeat requests are sent at the interval of 3000ms (by default) from the 
consumer. It has no backoff mechanism. The thread pool for 
data-plane-kafka-request-handler will be full soon.

Fix: 

Instead of waiting for the lock, we can just try to acquire the lock (probably 
with a time limit). If the acquisition fails, this request can be discarded so 
that other requests (which include the retry of the discarded one) can be 
processed. However, we feel this fix would affect the semantic of many 
operations. We would like to hear some suggestions from the community.

  was:
In Kafka-2.8.0, we found that the number of data plane Kafka request handlers 
may quickly reach the limit when only one request is stuck. As a result, all 
other requests that require a data plane request handler will be stuck.

When there is a slowdown inside the storeOffsets function at line 777 due to 
I/O operation, the thread holds the lock acquired at line 754.

 
{code:java}
  private def doCommitOffsets(group: GroupMetadata,
                              memberId: String,
                              groupInstanceId: Option[String],
                              generationId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
    group.inLock { // Line 754
..
      groupManager.storeOffsets() // Line 777
..
  }
} {code}
Its call stack is:

 
{code:java}
kafka.coordinator.group.GroupMetadata,inLock,227
kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755
kafka.server.KafkaApis

[jira] [Created] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown

2023-04-10 Thread Haoze Wu (Jira)
Haoze Wu created KAFKA-14886:


 Summary: Broker request handler thread pool is full due to single 
request slowdown
 Key: KAFKA-14886
 URL: https://issues.apache.org/jira/browse/KAFKA-14886
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: Haoze Wu


In Kafka-2.8.0, we found that the number of data plane Kafka request handlers 
may quickly reach the limit when only one request is stuck. As a result, all 
other requests that require a data plane request handler will be stuck.

When there is a slowdown inside the storeOffsets function at line 777 due to 
I/O operation, the thread holds the lock acquired at line 754.

 
{code:java}
  private def doCommitOffsets(group: GroupMetadata,
                              memberId: String,
                              groupInstanceId: Option[String],
                              generationId: Int,
                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
    group.inLock { // Line 754
..
      groupManager.storeOffsets() // Line 777
..
  }
} {code}
Its call stack is:

 
{code:java}
kafka.coordinator.group.GroupMetadata,inLock,227
kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755
kafka.server.KafkaApis,handleOffsetCommitRequest,515
kafka.server.KafkaApis,handle,175
kafka.server.KafkaRequestHandler,run,74
java.lang.Thread,run,748 {code}
This happens when the broker is handling the commit offset request from the 
consumer. When the slowdown mentioned above makes consumers get no response 
back, the consumer will automatically resend the request to the broker. Note 
that each request from the consumer is handled by a 
data-plane-kafka-request-handler thread. Therefore, another 
data-plane-kafka-request-handler thread will be also stuck at line 754 when 
handling the retry requests, because it tries to acquire the very same lock of 
the consumer group. The retry will occur repeatedly, and none of them can 
succeed. As a result, the pool of data-plane-kafka-request-handler threads will 
be full. Note that this pool of threads is responsible for handling all such 
requests from all producers and consumers. As a result, all the producers and 
consumers would be affected.

However, the backoff mechanism might be able to solve this issue, by reducing 
the number of requests in a short time and reserving more slots in the thread 
pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if 
the issue disappears. Specifically, we increase the retry backoff from 100ms 
(default) to 1000ms in consumer’s config. However, we found that the mentioned 
thread pool is full again, because there are multiple heartbeat requests that 
take up the slots of this thread pool. All those heartbeat request handling is 
stuck when they are acquiring the same consumer group lock, which has been 
acquired at line 754 as mentioned. Specifically, the heartbeat handling is 
stuck at GroupCoordinator.handleHeartbeat@624:
{code:java}
  def handleHeartbeat(groupId: String,
                      memberId: String,
                      groupInstanceId: Option[String],
                      generationId: Int,
                      responseCallback: Errors => Unit): Unit = {
..
      case Some(group) => group.inLock { // Line 624
..
      }
..
} {code}
The heartbeat requests are sent at the interval of 3000ms (by default) from the 
consumer. It has no backoff mechanism. The thread pool for 
data-plane-kafka-request-handler will be full soon.

Fix: 

Instead of waiting for the lock, we can just try to acquire the lock (probably 
with a time limit). If the acquisition fails, this request can be discarded so 
that other requests (which include the retry of the discarded one) can be 
processed. However, we feel this fix would affect the semantic of many 
operations. We would like to hear some suggestions from the community.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161854813


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##


Review Comment:
   Actually hmm -- I suppose this test is not present if you mean the exact 
path of returning the error and not producing to the log. I really did think I 
added such a test to replica manger test. I can try to add this path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161847053


##
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala:
##


Review Comment:
   Depends what you mean here. If you mean a unit test -- yes. If you mean a 
integration test, no because the correct behavior is built into the producer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction

2023-04-10 Thread via GitHub


jolshan commented on code in PR #13391:
URL: https://github.com/apache/kafka/pull/13391#discussion_r1161846085


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -35,6 +36,43 @@ trait ApiRequestHandler {
   def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit
 }
 
+object KafkaRequestHandler {
+  // Support for scheduling callbacks on a request thread.
+  private val threadRequestChannel = new ThreadLocal[RequestChannel]
+  private val currentRequest = new ThreadLocal[RequestChannel.Request]
+
+  // For testing
+  private var bypassThreadCheck = false
+  def setBypassThreadCheck(bypassCheck: Boolean): Unit = {
+bypassThreadCheck = bypassCheck
+  }
+  
+  def currentRequestOnThread(): RequestChannel.Request = {
+currentRequest.get()
+  }
+
+  /**
+   * Wrap callback to schedule it on a request thread.
+   * NOTE: this function must be called on a request thread.
+   * @param fun Callback function to execute
+   * @return Wrapped callback that would execute `fun` on a request thread
+   */
+  def wrap[T](fun: T => Unit)(request: RequestChannel.Request): T => Unit = {

Review Comment:
   My original concern was that if we just used the thread local, we would 
access it when the inner method is called. I guess I can just save a local 
variable when wrap is called and pass that value into the inner method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14420) MirrorMaker should not clear filtered configs on target topics

2023-04-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14420:
--
Fix Version/s: 3.5.0

> MirrorMaker should not clear filtered configs on target topics
> --
>
> Key: KAFKA-14420
> URL: https://issues.apache.org/jira/browse/KAFKA-14420
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.3.1
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.5.0
>
>
> If you set additional configurations on a remote topic, MirrorMaker will 
> clear them when it syncs topic configurations.
> The issue is that it also clears topic configurations that are filtered. For 
> example this prevents running Cruise Control on the target cluster as it may 
> set follower.replication.throttled.replicas and 
> leader.replication.throttled.replicas.
> MirrorMaker should not clear topic configurations that are filtered on the 
> target cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14420) MirrorMaker should not clear filtered configs on target topics

2023-04-10 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710197#comment-17710197
 ] 

Chris Egerton commented on KAFKA-14420:
---

Addressed with 
[KIP-894|https://cwiki.apache.org/confluence/display/KAFKA/KIP-894%3A+Use+incrementalAlterConfigs+API+for+syncing+topic+configurations].

> MirrorMaker should not clear filtered configs on target topics
> --
>
> Key: KAFKA-14420
> URL: https://issues.apache.org/jira/browse/KAFKA-14420
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.3.1
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.5.0
>
>
> If you set additional configurations on a remote topic, MirrorMaker will 
> clear them when it syncs topic configurations.
> The issue is that it also clears topic configurations that are filtered. For 
> example this prevents running Cruise Control on the target cluster as it may 
> set follower.replication.throttled.replicas and 
> leader.replication.throttled.replicas.
> MirrorMaker should not clear topic configurations that are filtered on the 
> target cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)

2023-04-10 Thread via GitHub


C0urante merged PR #13373:
URL: https://github.com/apache/kafka/pull/13373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ppatierno commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-10 Thread via GitHub


ppatierno commented on code in PR #13525:
URL: https://github.com/apache/kafka/pull/13525#discussion_r1161834571


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws 
IOException {
 state.initialize(new OffsetAndEpoch(0L, 0));
 raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
 
-assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-leader").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-vote").metricValue());
 assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
 state.transitionToFollower(2, 1);
-assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());

Review Comment:
   what I was referring here was a broker, which is not part of the controller 
quorum, but it still fetches the metadata topic from the leader controller but 
it is not part of the voters, because it has "broker" role and not part of the 
quorum. Isn't it an "observer"?
   This is also what we get by using the `kafka-metadata-quorum` bin tool which 
shows the brokers in the observer list. The purpose of the PR was kind of 
aligning the tool output with the KRaft current state metric which was not 
reporting a broker as "observer" but as "follower".
   I am not sure a broker has "resigned" in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ppatierno commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-10 Thread via GitHub


ppatierno commented on code in PR #13525:
URL: https://github.com/apache/kafka/pull/13525#discussion_r1161834571


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws 
IOException {
 state.initialize(new OffsetAndEpoch(0L, 0));
 raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
 
-assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-leader").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-vote").metricValue());
 assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
 state.transitionToFollower(2, 1);
-assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());

Review Comment:
   what I was referring here was a broker, which is not part of the controller 
quorum, but it still fetches the metadata topic from the leader controller but 
it is not part of the voters, because it has "broker" role and not part of the 
quorum. Isn't it an "observer"?
   This is also what we get by using the `kafka-metadata-quorum` bin tool which 
shows the brokers in the observer list. The purpose of the PR was kind of 
aligning the tool output with the KRaft current state metric which was not 
reporting a broker as "observer" but as "follower".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors

2023-04-10 Thread via GitHub


C0urante commented on PR #13424:
URL: https://github.com/apache/kafka/pull/13424#issuecomment-1501942587

   @mimaison thanks for the reviews. Do you think you'll have time to take 
another look at this before the April 12th feature freeze?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors

2023-04-10 Thread via GitHub


C0urante commented on code in PR #13424:
URL: https://github.com/apache/kafka/pull/13424#discussion_r1161808636


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, 
final Map
 );
 }
 
+@Override
+public void stopConnector(final String connName, final Callback 
callback) {
+log.trace("Submitting request to transition connector {} to STOPPED 
state", connName);
+
+addRequest(
+() -> {
+refreshConfigSnapshot(workerSyncTimeoutMs);
+if (!configState.contains(connName))
+throw new NotFoundException("Unknown connector " + 
connName);
+
+// We only allow the leader to handle this request since 
it involves writing task configs to the config topic
+if (!isLeader()) {
+callback.onCompletion(new NotLeaderException("Only the 
leader can transition connectors to the STOPPED state.", leaderUrl()), null);
+return null;
+}
+
+// TODO: We may want to add a new ConfigBackingStore 
method for stopping a connector so that

Review Comment:
   I guess this wasn't really a `TODO` so much as a `MAYBE-DO`. I've removed it 
since on second thought it's a `DON'T-DO-FOR-NOW` as there is little if any 
benefit from that change, except possibly blocking the herder thread for less 
time if we're having trouble writing to the config topic.
   
   There's also the transactional producer logic that Yash mentioned, but since 
it's not guaranteed that we'll have access to one (if exactly-once source 
support is disabled), that's not worth considering right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …

2023-04-10 Thread via GitHub


C0urante commented on PR #13453:
URL: https://github.com/apache/kafka/pull/13453#issuecomment-1501923378

   @vamossagar12 I think this approach is a bit too broad. We intentionally 
permit "unsafe" writes for reasons documented in the 
[AbstractHerder](https://github.com/apache/kafka/blob/17435484e4c49eef440ee412a711a88fed08bf50/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L90-L109)
 and 
[KafkaStatusBackingStore](https://github.com/apache/kafka/blob/17435484e4c49eef440ee412a711a88fed08bf50/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java#L74-L87)
 Javadocs. Specifically:
   
   > this prevents us from depending on the generation absolutely. If the group 
disappears and the generation is reset, then we'll overwrite the status 
information with the older (and larger) generation with the updated one. The 
danger of this approach is that slow starting tasks may cause the status to be 
overwritten after a rebalance has completed.
   
   I have a few alternatives in mind for how we might address this, but haven't 
fully thought any of them through yet since there are several KIP PRs that need 
to be reviewed right now which I'm giving priority to. To give me some idea of 
how highly to prioritize this once those are taken care of, can you let me know 
if this issue is actively affecting you or someone you know, or if the PR is a 
way to get up-to-speed with Kafka Connect, or something else?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #13466: MINOR: Fix base ConfigDef in AbstractHerder::connectorPluginConfig

2023-04-10 Thread via GitHub


C0urante commented on PR #13466:
URL: https://github.com/apache/kafka/pull/13466#issuecomment-1501875562

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics

2023-04-10 Thread via GitHub


dengziming commented on code in PR #13525:
URL: https://github.com/apache/kafka/pull/13525#discussion_r1161721906


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws 
IOException {
 state.initialize(new OffsetAndEpoch(0L, 0));
 raftMetrics = new KafkaRaftMetrics(metrics, "raft", state);
 
-assertEquals("unattached", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-leader").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"current-vote").metricValue());
 assertEquals((double) 0, getMetric(metrics, 
"current-epoch").metricValue());
 assertEquals((double) -1L, getMetric(metrics, 
"high-watermark").metricValue());
 
 state.transitionToFollower(2, 1);
-assertEquals("follower", getMetric(metrics, 
"current-state").metricValue());
+assertEquals("observer", getMetric(metrics, 
"current-state").metricValue());

Review Comment:
   In our code design, observer/voter are the state of follower,  
leader/candidate/voted/follower/unattached/Resigned are the state of raft node 
state machine, so IMO, we'd better add "resigned" state instead of "observer" 
state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-10 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1161677863


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.cluster.Partition;
+import kafka.log.LogSegment;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RemoteLogInputStream;
+import org.apache.kafka.common.utils.ChildFirstClassLoader;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LogSegmentData;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class is responsible for
+ * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
+ * - receives any leader and follower replica events and partition stop events 
and act on them
+ * - also provides APIs to fetch indexes, metadata about remote log segments
+ * - copying log segments to remote storage
+ */
+public class RemoteLogManager implements Closeable {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.c

[GitHub] [kafka] satishd commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-10 Thread via GitHub


satishd commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1161670474


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -0,0 +1,749 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.cluster.Partition;
+import kafka.log.LogSegment;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RemoteLogInputStream;
+import org.apache.kafka.common.utils.ChildFirstClassLoader;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LogSegmentData;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class is responsible for
+ * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
+ * - receives any leader and follower replica events and partition stop events 
and act on them
+ * - also provides APIs to fetch indexes, metadata about remote log segments
+ * - copying log segments to remote storage
+ */
+public class RemoteLogManager implements Closeable {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.c

[GitHub] [kafka] satishd commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-10 Thread via GitHub


satishd commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1161286998


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -0,0 +1,749 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.cluster.Partition;
+import kafka.log.LogSegment;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RemoteLogInputStream;
+import org.apache.kafka.common.utils.ChildFirstClassLoader;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LogSegmentData;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class is responsible for
+ * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
+ * - receives any leader and follower replica events and partition stop events 
and act on them
+ * - also provides APIs to fetch indexes, metadata about remote log segments
+ * - copying log segments to remote storage
+ */
+public class RemoteLogManager implements Closeable {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.c

[GitHub] [kafka] Stephan14 opened a new pull request, #13531: KAFKA-14885: fix kafka client connect to the broker that offline from…

2023-04-10 Thread via GitHub


Stephan14 opened a new pull request, #13531:
URL: https://github.com/apache/kafka/pull/13531

   … cluster
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-04-10 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1501643812

   > Do other committers have time to review this PR?
   
   @guozhangwang Hello, can you help to review these two PRs? this and 
https://github.com/apache/kafka/pull/13471


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-04-10 Thread via GitHub


mukkachaitanya commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1161593044


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath(
 builder.useParallelExecutor();
 Reflections reflections = new InternalReflections(builder);
 
-return new PluginScanResult(
-getPluginDesc(reflections, SinkConnector.class, loader),
-getPluginDesc(reflections, SourceConnector.class, loader),
-getPluginDesc(reflections, Converter.class, loader),
-getPluginDesc(reflections, HeaderConverter.class, loader),
-getTransformationPluginDesc(loader, reflections),
-getPredicatePluginDesc(loader, reflections),
-getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-);
+ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   `Plugins#withClassLoader` would be a nice touch to do these repetitive 
swaps; however, it doesn't seem to be a static method. I don't see a strong 
reason why it's not. If it's not too much out of the scope of this PR, can we 
make it static and use it to make the code cleaner? 
   
   Alternatively, if there are several instances where the code benefits form 
using the new static method we can tackle it with another refactor PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper

2023-04-10 Thread zou shengfu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu updated KAFKA-14885:

Description: 
When a broker has some issues about network, the broker can not connect to 
zookeeper and controller.At this time, we replace this broker with a new broker 
that has a same `broker.id` with fault broker, and we can not  stop the Kafka 
process on fault broker because of network issue. So the client can still 
connect this broker and can produce and consume messages normally.  But the 
data on fault broker maybe be lost because there are some leader for partitions 
on the fault broker. 

Do we have any good idea to solve this problem?

In my opinion, we can check  broker configuration (for example: broker ip)  
when broker reconnects to zookeeper and broker can exist if broker's 
configuration is not same with zookeeper. But if  broker can not reconnect to 
zookeeper successfully, maybe we need to check broker configuration between 
local disk and zookeeper periodically

 

  was:
When a broker has some issues about network, the broker can not connect to 
zookeeper and controller.At this time, we replace this broker with a new broker 
that has a same `broker.id` with fault broker, and we can not  stop the Kafka 
process on fault broker because of network issue. So the client can still 
connect this broker and can produce and consume messages normally.  But the 
data on fault broker maybe be lost because there are some leader for partitions 
on the fault broker. 

Do we have any good idea to solve this problem?

 


> Client can connect to broker and broker can not connect zookeeper
> -
>
> Key: KAFKA-14885
> URL: https://issues.apache.org/jira/browse/KAFKA-14885
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: zou shengfu
>Assignee: zou shengfu
>Priority: Major
>
> When a broker has some issues about network, the broker can not connect to 
> zookeeper and controller.At this time, we replace this broker with a new 
> broker that has a same `broker.id` with fault broker, and we can not  stop 
> the Kafka process on fault broker because of network issue. So the client can 
> still connect this broker and can produce and consume messages normally.  But 
> the data on fault broker maybe be lost because there are some leader for 
> partitions on the fault broker. 
> Do we have any good idea to solve this problem?
> In my opinion, we can check  broker configuration (for example: broker ip)  
> when broker reconnects to zookeeper and broker can exist if broker's 
> configuration is not same with zookeeper. But if  broker can not reconnect to 
> zookeeper successfully, maybe we need to check broker configuration between 
> local disk and zookeeper periodically
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper

2023-04-10 Thread zou shengfu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zou shengfu updated KAFKA-14885:

Description: 
When a broker has some issues about network, the broker can not connect to 
zookeeper and controller.At this time, we replace this broker with a new broker 
that has a same `broker.id` with fault broker, and we can not  stop the Kafka 
process on fault broker because of network issue. So the client can still 
connect this broker and can produce and consume messages normally.  But the 
data on fault broker maybe be lost because there are some leader for partitions 
on the fault broker. 

Do we have any good idea to solve this problem?

 

> Client can connect to broker and broker can not connect zookeeper
> -
>
> Key: KAFKA-14885
> URL: https://issues.apache.org/jira/browse/KAFKA-14885
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.2
>Reporter: zou shengfu
>Assignee: zou shengfu
>Priority: Major
>
> When a broker has some issues about network, the broker can not connect to 
> zookeeper and controller.At this time, we replace this broker with a new 
> broker that has a same `broker.id` with fault broker, and we can not  stop 
> the Kafka process on fault broker because of network issue. So the client can 
> still connect this broker and can produce and consume messages normally.  But 
> the data on fault broker maybe be lost because there are some leader for 
> partitions on the fault broker. 
> Do we have any good idea to solve this problem?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14581) Move GetOffsetShell to tools

2023-04-10 Thread Ruslan Krivoshein (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708011#comment-17708011
 ] 

Ruslan Krivoshein edited comment on KAFKA-14581 at 4/10/23 8:12 AM:


Let it be booked for me, please.

 

UPD: I almost finished it basing on an unaccepted PR for blocking task (it 
looks completed).


was (Author: krivosheinruslan):
Let it be booked for me, please

> Move GetOffsetShell to tools
> 
>
> Key: KAFKA-14581
> URL: https://issues.apache.org/jira/browse/KAFKA-14581
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper

2023-04-10 Thread zou shengfu (Jira)
zou shengfu created KAFKA-14885:
---

 Summary: Client can connect to broker and broker can not connect 
zookeeper
 Key: KAFKA-14885
 URL: https://issues.apache.org/jira/browse/KAFKA-14885
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.3.2
Reporter: zou shengfu
Assignee: zou shengfu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14873) Pluggable storage for Kafka Connect internal topics

2023-04-10 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710078#comment-17710078
 ] 

Sagar Rao commented on KAFKA-14873:
---

Hi [~malthe] , thanks for reporting this. I think your point makes sense in 
general terms. That's why, if you see all the3 backing stores are implemented 
as interfaces for example => `OffsetBackingStore`, `StatusBackingStore` and 
`ConfigBackingStore`.

The implementations that we see for Kafka are just implementors of these 
interfaces. Typically, there is a `MemoryBased` equivalent to these backing 
stores which IIUC is used in standalone mode.

Do you have any KV based backing stores in mind that you would want to see 
being added? I see Azure Table Storage has been cited as an example in OP. If 
you think something like this is needed, then it would need to go through a KIP 
process since a new public interface is being added. Let me know if you want to 
take it further via a KIP.

> Pluggable storage for Kafka Connect internal topics
> ---
>
> Key: KAFKA-14873
> URL: https://issues.apache.org/jira/browse/KAFKA-14873
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Malthe Borch
>Priority: Major
>  Labels: needs-kip
>
> The Kafka Connect framework relies on compacted topics to store config, 
> offset and status information for each connector.
> This conflates two kinds of data, control and content, which some people 
> disagree with. Notably, [Azure Event 
> Hub|https://learn.microsoft.com/en-us/azure/event-hubs/log-compaction] does 
> not (or _did not_, because there's currently a preview release out which does 
> have support for compacted topics albeit only at the more expensive premium 
> tiers).
> In some deployments, it may be desirable to use a different backend for these 
> control settings (which essentially take a key/value form), for example 
> [Azure Table 
> Storage|https://learn.microsoft.com/en-us/rest/api/storageservices/table-service-rest-api]
>  – basically any key/value store that provides the Write-If-Matches primitive 
> to update a key only if the current value matches a known value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted

2023-04-10 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710077#comment-17710077
 ] 

Sagar Rao commented on KAFKA-14750:
---

I ran a few more tests for this. This issue is found specifically when a mass 
delete of topics is issued. 

Typically, if a few topics is deleted ( I tested with topics ranging from 
1-10), the `position` API doesn't fail. It says the following 

```

[2023-04-10 12:19:33,115] INFO [local-file-sink-300|task-0] [Consumer 
clientId=connector-consumer-local-file-sink-300-0, 
groupId=connect-local-file-sink-300] Resetting offset for partition 
connect-test-3010-0 to position FetchPosition\{offset=0, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: 
null)], epoch=0}}. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:399)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-300-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3002-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3003-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3004-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3005-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3006-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3007-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3008-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3009-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] 
WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition 
connect-test-3010-0 with offset 0 
(org.apache.kafka.connect.runtime.WorkerSinkTask:700)

[2023-04-10 12:19:33,660] WARN [local-file-sink-300|task-0] [Consumer 
clientId=connector-consumer-local-file-sink-300-0, 
groupId=connect-local-file-sink-300] Received unknown topic or partition error 
in fetch for partition connect-test-3002-0 
(org.apache.kafka.clients.consumer.internals.Fetcher:1340)

[2023-04-10 12:19:33,661] INFO [local-file-sink-300|task-0] [Consumer 
clientId=connector-consumer-local-file-sink-300-0, 
groupId=connect-local-file-sink-300] Request joining group due to: cached 
metadata has changed from (version362: \{connect-test-3007=1, 
connect-test-3008=1, connect-test-3009=1, connect-test-3003=1, 
connect-test-300=1, connect-test-3004=1, connect-test-3005=1, 
connect-test-3006=1, connect-test-3010=1, connect-test-3002=1}) at the 
beginning of the rebalance to (version363: \{connect-test-3007=1, 
connect-test-3008=1, connect-test-3009=1, connect-test-3003=1, 
connect-test-300=1, connect-test-3004=1, connect-test-3005=1, 
connect-test-3006=1, connect-test-3010=1}) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1066)

[2023-04-10 12:19:34,165] INFO [local-file-sink-300|task-0] [Consumer 
clientId=connector-consumer-local-file-sink-300-0, 
groupId=connect-local-file-sink-300] Revoke previously assigned partitions 
connect-test-300-0, connect-test-3002-0, connect-test-3003-0, 
connect-test-3004-0, connect-test-3005-0, connect-test-3006-0, 
connect-test-3007-0, connect-test-3008-0, connect-test-3009-0, 
connect-test-3010-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:331)

```

 

Notice the second `Request joining group due to: cached metadata has changed 
from` after 
Received unknown topic or partition error in fetch for partition
In the sort of test that was executed above, the line `
Received unknown topic or partition error in fetch for partition
is not foll