[GitHub] [kafka] satishd opened a new pull request, #13535: [DRAFT] KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd opened a new pull request, #13535: URL: https://github.com/apache/kafka/pull/13535 This PR is not yet ready for review. It is built on top of other PR-13487 This PR includes - Recognize the fetch requests with out of range local log offsets - Add fetch implementation for the data lying in the range of [logStartOffset, localLogStartOffset] - Add a new purgatory for async remote read requests which are served through a specific thread pool todo: Add more tests for the newly introduced changes. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] emissionnebula commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
emissionnebula commented on PR #13280: URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502743377 > @ijuma @rondagostino : I feel like "persistent" should appear somewhere in the class names here. Perhaps you're right that it doesn't need to be in the short class name, but can we put the classes in a namespace that includes that word? So something like `org.apache.kafka.server.persistent`? Then we'd have `org.apache.kafka.server.persistent.ImmutableMap`, etc. > PMap or PersistentMap are also possible, but they can be confused with a map that persists the data I feel we should not only use the `persistent` in the package name but also name the class `PersistentMap`. Looks like persistent is not a new term, there was a 1989 [paper](https://www.cs.cmu.edu/~sleator/papers/making-data-structures-persistent.pdf) that coined this term. Clojure also defines them as PersistentVector and PersistentMap. Languages such as Scala, Elixir, and Haskell use the term persistent in the documentation as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14889) RemoteLogManager - allow consumer fetch records from remote storage implementation
Luke Chen created KAFKA-14889: - Summary: RemoteLogManager - allow consumer fetch records from remote storage implementation Key: KAFKA-14889 URL: https://issues.apache.org/jira/browse/KAFKA-14889 Project: Kafka Issue Type: Sub-task Reporter: Luke Chen Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA covers enabling consumers fetch records from remote storage [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation
[ https://issues.apache.org/jira/browse/KAFKA-14888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-14888: - Assignee: Luke Chen > RemoteLogManager - deleting expired/size breached log segments to remote > storage implementation > > > Key: KAFKA-14888 > URL: https://issues.apache.org/jira/browse/KAFKA-14888 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA > covers deleting time/size breached log segments in remote storage. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14889) RemoteLogManager - allow consumer fetch records from remote storage implementation
[ https://issues.apache.org/jira/browse/KAFKA-14889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-14889: - Assignee: Satish Duggana > RemoteLogManager - allow consumer fetch records from remote storage > implementation > --- > > Key: KAFKA-14889 > URL: https://issues.apache.org/jira/browse/KAFKA-14889 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Satish Duggana >Priority: Major > > Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA > covers enabling consumers fetch records from remote storage > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] > h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter
vcrfxia commented on code in PR #13496: URL: https://github.com/apache/kafka/pull/13496#discussion_r1162213072 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ## @@ -153,11 +154,37 @@ public void init(final ProcessorContext context) { @Override public ValueAndTimestamp get(final K key) { -final ValueAndTimestamp valueAndTimestamp1 = valueGetter1.get(key); +return computeJoin(key, valueGetter1::get, valueGetter2::get); +} + +@Override +public ValueAndTimestamp get(final K key, final long asOfTimestamp) { Review Comment: > You say "older join result" -- don't think they would get a join result, would they? They get what should've been a join result, if the join were to emit a complete history of older join results (which it does not due to computational expediency). Here's a concrete example to check we're on the same page. Suppose we have an inner join, and all records are for the same key: ``` A: (a5, ts=5) B: (b1, ts=1) --> triggers join result (a5, b1) with ts=5 A: (a2, ts=2) --> no new join result, because this record is out-of-order ``` If the result is not materialized and someone calls `get(k, 2)` on the value getter, then the value getter will join `a2` and `b1` on the fly and return `(a2, b1)` even though this was never emitted downstream. I gave this some more thought and I think the behavior could be desirable, even though I agree with your statement: > given how the "join processor" works, it basically get a versioned input ktable, and produced a non-versioned ktable and drops out-of-order records. So if we would only expose a value-getter that does not support get(k, ts) it would be reasonable to me. I think there actually is a situation in which `get(k, ts)` would be called on this join value getter today. If the table-table join result is not materialized, and is directly joined to a stream, then if the table-table join result is identified as "versioned" then the stream-table join will call `get(k, ts)` on the value getter. This situation is really interesting because it would be wrong for the user to explicitly materialize the result of the table-table join with a versioned store, and then join it to the stream, but if they do not explicitly materialize the result and instead perform the join directly, then they can get proper stream-table join semantics using the value getter. Assuming my understanding is correct, then we have two options: 1. Say that the result of a table-table join (where both input tables are versioned) where the result is not explicitly materialized is versioned, and have the value getter support versioning, as in the current PR. Then the stream-(table-table) join uses fully versioned semantics and returns correct results. 2. Say the the result of a table-table join (where both input tables are versioned) where the result is not explicitly materialized is not versioned, and update the value getter to reflect this. Then the stream-(table-table) join does not use versioned semantics. Users need to perform a (stream-table)-table join to get versioned semantics instead. The first option is nice in that now `stream-(table-table)` and `(stream-table)-table` joins with no intermediate materialization produce the same results, but it's also confusing because `stream-(table-table)` produces different results if the user materializes the result of the table-table join as a versioned store (which is wrong). WDYT? I'm happy with either approach now that I feel like we've discussed all angles fully. We do need to make a decision in this PR though, if my understanding about `get(k, ts)` being called from downstream stream-table joins if it's supported is correct else there will be compatibility implications for changing it in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14888) RemoteLogManager - deleting expired/size breached log segments to remote storage implementation
Luke Chen created KAFKA-14888: - Summary: RemoteLogManager - deleting expired/size breached log segments to remote storage implementation Key: KAFKA-14888 URL: https://issues.apache.org/jira/browse/KAFKA-14888 Project: Kafka Issue Type: Sub-task Reporter: Luke Chen Implementation of RLM as mentioned in the HLD section of KIP-405, this JIRA covers deleting time/size breached log segments in remote storage. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14834) Improved processor semantics for versioned stores
[ https://issues.apache.org/jira/browse/KAFKA-14834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victoria Xia updated KAFKA-14834: - Description: With the introduction of versioned state stores in [KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores], we should leverage them to provide improved join semantics. As described in [KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores], we will make the following four improvements: * stream-table joins will perform a timestamped lookup (using the stream-side record timestamp) if the table is versioned * table-table joins, including foreign key joins, will not produce new join results on out-of-order records (by key) from versioned tables * table filters will disable the existing optimization to not send duplicate tombstones when applied to a versioned table * table aggregations will ignore out-of-order records when aggregating a versioned table was: With the introduction of versioned state stores in [KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores], we should leverage them to provide improved join semantics. As described in [KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores], we will make the following two improvements: * stream-table joins will perform a timestamped lookup (using the stream-side record timestamp) if the table is materialized with a versioned store * table-table joins, including foreign key joins, will not produce new join results on out-of-order records (by key) from tables materialized with versioned stores Summary: Improved processor semantics for versioned stores (was: Improved stream-table and table-table join semantics for versioned stores) > Improved processor semantics for versioned stores > - > > Key: KAFKA-14834 > URL: https://issues.apache.org/jira/browse/KAFKA-14834 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Victoria Xia >Assignee: Victoria Xia >Priority: Major > Labels: kip, streams > > With the introduction of versioned state stores in > [KIP-889|https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores], > we should leverage them to provide improved join semantics. > As described in > [KIP-914|https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+DSL+Processor+Semantics+for+Versioned+Stores], > we will make the following four improvements: > * stream-table joins will perform a timestamped lookup (using the > stream-side record timestamp) if the table is versioned > * table-table joins, including foreign key joins, will not produce new join > results on out-of-order records (by key) from versioned tables > * table filters will disable the existing optimization to not send duplicate > tombstones when applied to a versioned table > * table aggregations will ignore out-of-order records when aggregating a > versioned table -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
ijuma commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191919 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { +/** + * @return the underlying persistent map + */ +Object underlying(); + +/** + * @param key the key + * @param value the value + * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary) + */ +ImmutableMap updated(K key, V value); Review Comment: I was trying to avoid inventing a new naming convention and hence went with what `Scala` had chosen. If we have another convention we like more that there is a precedent for, happy to discuss. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
ijuma commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191307 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { Review Comment: I think this looks pretty clean now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
ijuma commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162191307 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { Review Comment: I think this looks pretty clean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162184665 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { +/** + * @return the underlying persistent map + */ +Object underlying(); Review Comment: I removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
rondagostino commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162184505 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { Review Comment: I updated it. I removed the factory in favor of static methods on the interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing
rondagostino commented on PR #13532: URL: https://github.com/apache/kafka/pull/13532#issuecomment-1502480192 Test failures are unrelated. `MirrorConnectorsWithCustomForwardingAdminIntegrationTest` passed locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14887) ZK session timeout can cause broker to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-14887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ron Dagostino reassigned KAFKA-14887: - Assignee: Ron Dagostino > ZK session timeout can cause broker to shutdown > --- > > Key: KAFKA-14887 > URL: https://issues.apache.org/jira/browse/KAFKA-14887 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, > 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 2.7.2, 3.1.0, 2.7.1, 2.8.0, > 2.7.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > > We have the following code in FinalizedFeatureChangeListener.scala which will > exit regardless of the type of exception that is thrown when trying to > process feature changes: > case e: Exception => { > error("Failed to process feature ZK node change event. The broker > will eventually exit.", e) > throw new FatalExitError(1) > The issue here is that this does not distinguish between exceptions caused by > an inability to process a feature change and an exception caused by a > ZooKeeper session timeout. We want to shut the broker down for the former > case, but we do NOT want to shut the broker down in the latter case; the > ZooKeeper session will eventually be reestablished, and we can continue > processing at that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup
philipnee commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1162151428 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -447,6 +482,19 @@ public void close() { @Override public void close(Duration timeout) { +if (timeout.toMillis() < 0) +throw new IllegalArgumentException("The timeout cannot be negative."); +try { +if (!closed) { +close(timeout, false); +} +} finally { +closed = true; Review Comment: actually that's a good point. I did this because it was the original implementation and I didn't put too much thinking into this. For the current threading model, we definitely want to avoid multiple thread trying to close the client. Can we synchronize on this method to prevent multithreading access? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request, #13534: KAFKA-14054: Handle TimeoutException gracefully
mjsax opened a new pull request, #13534: URL: https://github.com/apache/kafka/pull/13534 We incorrectly assumed, that `consumer.position()` should always be served by the consumer locally set position. However, within `commitNeeded()` we check if first `if(commitNeeded)` and thus go into the else only if we have not processed data (otherwise, `commitNeeded` would be true). For this reason, we actually don't know if the consumer has a valid position or not. We should just swallow a timeout if the consumer cannot get the position from the broker, and try the next partition. If any position advances, we can return true, and if we timeout for all partitions we can return false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on PR #13280: URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502416844 > Yes, updated would only make sense on the map interface. For the Set interface, added seems reasonable. Sorry to bikeshed this again but I kinda like `withAdded`, `withRemoval`, etc. Curious how you feel about those. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on PR #13280: URL: https://github.com/apache/kafka/pull/13280#issuecomment-1502415755 @ijuma @rondagostino : I feel like "persistent" should appear somewhere in the class names here. Perhaps you're right that it doesn't need to be in the short class name, but can we put the classes in a namespace that includes that word? So something like `org.apache.kafka.server.persistent`? Then we'd have `org.apache.kafka.server.persistent.ImmutableMap`, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162134827 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { +/** + * @return the underlying persistent map + */ +Object underlying(); + +/** + * @param key the key + * @param value the value + * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary) + */ +ImmutableMap updated(K key, V value); Review Comment: how do you guys feel about `withUpdate` as a name? ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { +/** + * @return the underlying persistent map + */ +Object underlying(); + +/** + * @param key the key + * @param value the value + * @return a wrapped persistent map that differs from this one in that the given mapping is added (if necessary) + */ +ImmutableMap updated(K key, V value); + +/** + * @param key the key + * @return a wrapped persistent map that differs from this one in that the given mapping is removed (if necessary) + */ +ImmutableMap removed(K key); Review Comment: how do you guys feel about `withRemoval` as a name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a diff in pull request #13490: KAFKA-14875: Implement wakeup
vvcephei commented on code in PR #13490: URL: https://github.com/apache/kafka/pull/13490#discussion_r1162130597 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -447,6 +482,19 @@ public void close() { @Override public void close(Duration timeout) { +if (timeout.toMillis() < 0) +throw new IllegalArgumentException("The timeout cannot be negative."); +try { +if (!closed) { +close(timeout, false); +} +} finally { +closed = true; Review Comment: It's interesting to set this only after the call to close completes or fails. I could see setting it only after it completes to ensure it's really closed, or setting it with a CAS before calling the inner close so that multiple callers don't all try to close at once. I'm not sure I see the rationale of doing it this way, though. If there is a reason I'm not seeing, could you add a comment explaining it so the code will be clear? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -319,26 +328,52 @@ public Map committed(final Set + * If the timeout specified by {@code default.api.timeout.ms} expires + * {@link org.apache.kafka.common.errors.TimeoutException} is thrown. + * + * @param partitions The partition to check + * @param timeout The maximum time to block. + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * the timeout specified by {@code default.api.timeout.ms} expires. + */ @Override public Map committed(final Set partitions, final Duration timeout) { +maybeWakeup(); maybeThrowInvalidGroupIdException(); + if (partitions.isEmpty()) { return new HashMap<>(); } final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions); +activeFutures.add(event.future()); eventHandler.add(event); try { -return event.complete(Duration.ofMillis(100)); + +return event.complete(timeout); +} catch (ExecutionException e) { +throw new KafkaException(e); } catch (InterruptedException e) { -throw new InterruptException(e); +throw new InterruptException(e.getMessage()); Review Comment: What's up with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162131601 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { Review Comment: I don't feel very strongly about this either way. I like the elegance of having a separate factory instead of static functions, but I do wonder if that will make it harder to do a quick replacement of the current implementation with the Vavr implementation (or whatever) and rerun the JMH benchmarks without changing anything else. Maybe it's fine as-is, though? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162131601 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { Review Comment: I don't feel strongly about it, but I agree that from a practical point of view it might be easier to have a factory method. Then you could just replace that factory method implementation with the Vavr implementation (or whatever) and rerun the JMH benchmarks without changing anything else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162129955 ## server-common/src/main/java/org/apache/kafka/server/immutable/ImmutableMap.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.immutable; + +import java.util.Map; + +/** + * A persistent Hash-based Map wrapper. + * java.util.Map methods that mutate in-place will throw UnsupportedOperationException + * + * @param the key type + * @param the value type + */ +public interface ImmutableMap extends Map { +/** + * @return the underlying persistent map + */ +Object underlying(); Review Comment: Yeah, I think it would be good to avoid this unless there is a concrete reason we need it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162128357 ## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ## @@ -76,8 +84,8 @@ public TopicImage getTopic(String name) { } public void write(ImageWriter writer, ImageWriterOptions options) { -for (TopicImage topicImage : topicsById.values()) { -topicImage.write(writer, options); +for (Map.Entry entry : topicsById.entrySet()) { +entry.getValue().write(writer, options); Review Comment: The `java.util.Collection` returned by `java.util.Map#values` is pretty basic. I can't find any comment about how its equals method is supposed to work. For `HashMap#values`, it seems to just be doing reference equality. (In contrast, `Map#keySet` returns an actual Set which is compared how you would expect.) @rondagostino, like you, I wrote a test program with `java.util.HashMap` and `java.util.HashSet` and got these very similar results ``` foo = {a->a, b->b) bar = {a, b} foo.values().equals(foo.values()) = true new HashSet<>(foo.values()).equals(bar) = true foo.values().equals(bar) = false bar.equals(foo.values()) = false foo.keySet().equals(bar) = true bar.equals(foo.keySet()) = true ``` > We could, but it is marked deprecated in the library because there is no way to provide a reasonable .equals() method. I actually checked, and indeed it is true: What version of the pcollections source code were you looking at? I downloaded the source from https://github.com/hrldcpr/pcollections and wasn't able to find any comment or deprecated indicator for `HashPMap#values()`. In fact, it looks like it simply inherits the `AbstractMap#values` implementation without any changes. I suspect that this will actually implement reference equality, since this implementation saves the Collection object it creates in a `transient` field (ew) But even leaving that aside, I can't find any API guarantees about what the equals method of the collection returned by `Map#values` is supposed to do. It's possible that this is just undefined. At any rate the existing behavior of the java.util.Map subclasses is certainly useless here (it will not be what anyone expects) `Collection#equals` says you can do whatever you want for `equals`, but you should "exercise care". Zooming out a bit, the big picture is that interfaces like List or Set define a reasonable behavior for equals, whereas Collection (which is a parent interface for both) is just like :shrug: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162128357 ## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ## @@ -76,8 +84,8 @@ public TopicImage getTopic(String name) { } public void write(ImageWriter writer, ImageWriterOptions options) { -for (TopicImage topicImage : topicsById.values()) { -topicImage.write(writer, options); +for (Map.Entry entry : topicsById.entrySet()) { +entry.getValue().write(writer, options); Review Comment: The `java.util.Collection` returned by `java.util.Map#values` is pretty basic. I can't find any comment about how its equals method is supposed to work. For `HashMap#values`, it seems to just be doing reference equality. (In contrast, `Map#keySet` returns an actual Set which is compared how you would expect.) @rondagostino, like you, I wrote a test program with `java.util.HashMap` and `java.util.HashSet` and got these very similar results ``` foo = {a->a, b->b) bar = {a, b} foo.values().equals(foo.values()) = true new HashSet<>(foo.values()).equals(bar) = true foo.values().equals(bar) = false bar.equals(foo.values()) = false foo.keySet().equals(bar) = true bar.equals(foo.keySet()) = true ``` > We could, but it is marked deprecated in the library because there is no way to provide a reasonable .equals() method. I actually checked, and indeed it is true: What version of the pcollections source code were you looking at? I downloaded the source from https://github.com/hrldcpr/pcollections and wasn't able to find any comment or deprecated indicator for `HashPMap#values()`. In fact, it looks like it simply inherits the `AbstractMap#values` implementation without any changes. I suspect that this will actually implement reference equality, since this implementation saves the Collection object it creates in a `transient` field (ew) But even leaving that aside, I can't find any API guarantees about what the equals method of the collection returned by `Map#values` is supposed to do. It's possible that this is just undefined. At any rate the existing behavior of the java.util.Map subclasses is certainly useless here (it will not be what anyone expects) `Collection#equals` says you can do whatever you want for `equals`, but you should "exercise care". Zooming out a bit, the big picture is that interfaces like List or Set define a reasonable behavior for equals, whereas Collection (which is the base class for both) is just like #:shrug: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14054) Unexpected client shutdown as TimeoutException is thrown as IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-14054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-14054: --- Assignee: Matthias J. Sax > Unexpected client shutdown as TimeoutException is thrown as > IllegalStateException > - > > Key: KAFKA-14054 > URL: https://issues.apache.org/jira/browse/KAFKA-14054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.1.0, 3.2.0, 3.1.1 >Reporter: Donald >Assignee: Matthias J. Sax >Priority: Major > > Re: > https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2 > 1) TimeoutException is thrown as IllegalStateException in > {_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. > Which causes the client to shutdown in > {_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}. > 2) Should Timeout be a recoverable error which is expected to be handled by > User? > 3) This issue is exposed by change KAFKA-12887 which was introduced in > kafka-streams ver 3.1.0 > *code referenced* > {code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded} > public boolean commitNeeded() { > if (commitNeeded) { > return true; > } else { > for (final Map.Entry entry : > consumedOffsets.entrySet()) { > final TopicPartition partition = entry.getKey(); > try { > final long offset = mainConsumer.position(partition); > if (offset > entry.getValue() + 1) { > commitNeeded = true; > entry.setValue(offset - 1); > } > } catch (final TimeoutException error) { > // the `consumer.position()` call should never block, > because we know that we did process data > // for the requested partition and thus the consumer > should have a valid local position > // that it can return immediately > // hence, a `TimeoutException` indicates a bug and thus > we rethrow it as fatal `IllegalStateException` > throw new IllegalStateException(error); > } catch (final KafkaException fatal) { > throw new StreamsException(fatal); > } > } > return commitNeeded; > } > } > {code} > {code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable} > private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse > getActionForThrowable(final Throwable throwable, > > final StreamsUncaughtExceptionHandler > streamsUncaughtExceptionHandler) { > final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse > action; > if (wrappedExceptionIsIn(throwable, > EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) { > action = SHUTDOWN_CLIENT; > } else { > action = streamsUncaughtExceptionHandler.handle(throwable); > } > return action; > } > private void handleStreamsUncaughtException(final Throwable throwable, > final > StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, > final boolean > skipThreadReplacement) { > final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse > action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); > if (oldHandler) { > log.warn("Stream's new uncaught exception handler is set as well > as the deprecated old handler." + > "The old handler will be ignored as long as a new handler > is set."); > } > switch (action) { > case REPLACE_THREAD: > if (!skipThreadReplacement) { > log.error("Replacing thread in the streams uncaught > exception handler", throwable); > replaceStreamThread(throwable); > } else { > log.debug("Skipping thread replacement for recoverable > error"); > } > break; > case SHUTDOWN_CLIENT: > log.error("Encountered the following exception during > processing " + > "and Kafka Streams opted to " + action + "." + > " The streams client is going to shut down now. ", > throwable); > closeToError(); > break; > {code} > *Stacktrace* > {code:java|title=error log kafka-
[GitHub] [kafka] mjsax commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration
mjsax commented on PR #13269: URL: https://github.com/apache/kafka/pull/13269#issuecomment-1502392008 @cadonna -- Should we cherry-pick to 3.4 branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
mjsax commented on PR #10747: URL: https://github.com/apache/kafka/pull/10747#issuecomment-1502379032 Follow up PR to user varint encoding: https://github.com/apache/kafka/pull/13533 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request, #13533: KAFKA-12446: update change encoding to use varint
mjsax opened a new pull request, #13533: URL: https://github.com/apache/kafka/pull/13533 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13476: KAFKA-14462; [4/N] Add GroupMetadataManager: ConsumerGroups Management, Members Management and Reconciliation Logic
jeffkbkim commented on code in PR #13476: URL: https://github.com/apache/kafka/pull/13476#discussion_r1161913599 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.FencedMemberEpochException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.GroupMaxSizeReachedException; +import org.apache.kafka.common.errors.InvalidRequestException; +import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedAssignorException; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMemberAssignment; +import org.apache.kafka.coordinator.group.consumer.CurrentAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newCurrentAssignmentTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupEpochRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newMemberSubscriptionTombstoneRecord; +import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord; + +/** + * The GroupMetadataManager manages the metadata of all generic and consumer groups. It holds + * the hard and the soft state of the groups. This class has two kinds of methods: Re
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader
guozhangwang commented on code in PR #13523: URL: https://github.com/apache/kafka/pull/13523#discussion_r1162110812 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -988,6 +1005,14 @@ private void prepareChangelogs(final Map tasks, } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } + +final TaskId taskId = changelogMetadata.stateManager.taskId(); +final Task task = tasks.get(taskId); +// if the log is truncated between when we get the log end offset and when we get the +// consumer position, then it's possible that the difference become negative and there's actually +// no records to restore; in this case we just initialize the sensor to zero Review Comment: I was thinking about the case where the consumer's position is set as the checkpointed offset, whereas the log end offset was truncated after the checkpoint was written. Maybe I did not make that clear in the above comment? Will reword a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162109668 ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -354,6 +361,7 @@ class RequestChannel(val queueSize: Int, private val processors = new ConcurrentHashMap[Int, Processor]() val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) + private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize) Review Comment: Let's do in a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader
guozhangwang commented on code in PR #13523: URL: https://github.com/apache/kafka/pull/13523#discussion_r1162107892 ## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ## @@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnab kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled); kafkaStreams.pause(); kafkaStreams.start(); -waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); +waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT); assertTrue(kafkaStreams.isPaused()); produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA); -waitUntilStreamsHasPolled(kafkaStreams, 2); Review Comment: Sorry my bad, it should still be there, will add back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …
cmccabe commented on code in PR #13280: URL: https://github.com/apache/kafka/pull/13280#discussion_r1162107641 ## metadata/src/main/java/org/apache/kafka/image/TopicsImage.java: ## @@ -21,41 +21,48 @@ import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.server.immutable.ImmutableMap; +import org.apache.kafka.server.immutable.ImmutableMapSetFactory; import org.apache.kafka.server.util.TranslatedValueMapView; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; - /** * Represents the topics in the metadata image. * * This class is thread-safe. */ public final class TopicsImage { -public static final TopicsImage EMPTY = -new TopicsImage(Collections.emptyMap(), Collections.emptyMap()); +private static final ImmutableMapSetFactory FACTORY = ImmutableMapSetFactory.PCOLLECTIONS_FACTORY; + +public static final TopicsImage EMPTY = new TopicsImage(FACTORY.emptyMap(), FACTORY.emptyMap()); -private final Map topicsById; -private final Map topicsByName; +final ImmutableMap topicsById; Review Comment: I realize that this is immutable and very unlikely to ever be computed on the fly, but I would still really like to avoid direct field access. If we start doing stuff like this, people will tend to copy the pattern where it can cause harm (for example, where the map is mutable, or where we might want to compute the value on the fly). And there is no benefit here, so let's avoid it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13523: MINOR: Follow-up on failing streams test, and fix StoreChangelogReader
guozhangwang commented on code in PR #13523: URL: https://github.com/apache/kafka/pull/13523#discussion_r1162102863 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -880,6 +896,9 @@ private void initializeChangelogs(final Map tasks, } private void addChangelogsToRestoreConsumer(final Set partitions) { +if (partitions.isEmpty()) Review Comment: Yes that's right, but as long as `partitions.isEmpty()` then we could actually skip the whole func, so I made it applied at larger scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
cmccabe commented on code in PR #13513: URL: https://github.com/apache/kafka/pull/13513#discussion_r1162102531 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -458,8 +458,9 @@ public void run() throws Exception { zkMigrationClient.readAllMetadata(batch -> { try { if (log.isTraceEnabled()) { -log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); +log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); Review Comment: It looks like you changed this for the purpose of debugging and forgot to change it back. BTW you can just change the test log4j.properties file to avoid this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
cmccabe commented on code in PR #13513: URL: https://github.com/apache/kafka/pull/13513#discussion_r1162102172 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -458,8 +458,9 @@ public void run() throws Exception { zkMigrationClient.readAllMetadata(batch -> { try { if (log.isTraceEnabled()) { -log.trace("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); +log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); } else { +log.info("Migrating {} records from ZK: {}", batch.size(), recordBatchToString(batch)); Review Comment: This looks like a typo. Why is this the same as in the trace-enabled case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #13513: KAFKA-14881: Update UserScramCredentialRecord for SCRAM ZK to KRaft migration
cmccabe commented on PR #13513: URL: https://github.com/apache/kafka/pull/13513#issuecomment-1502329503 > Should I move ScramCredentialData.java from metadata/src/main/java/org/apache/kafka/image to metadata/src/main/java/org/apache/kafka/metadata ? yes, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request, #13532: KAFKA-14887: No shutdown for ZK session expiration in feature processing
rondagostino opened a new pull request, #13532: URL: https://github.com/apache/kafka/pull/13532 `FinalizedFeatureChangeListener` shuts the broker down when it encounters an issue trying to process feature change events, but it does not distinguish between issues related to feature changes actually failing and other exceptions like ZooKeeper session expiration. This introduces the possibility that Zookeeper session expiration could cause the broker to shutdown, which is not intended. This patch updates the code to distinguish between these two types of exceptions and in the case of something like a ZK session expiration it logs a warning and continues. We shutdown the broker only for `FeatureCacheUpdateException`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14887) ZK session timeout can cause broker to shutdown
Ron Dagostino created KAFKA-14887: - Summary: ZK session timeout can cause broker to shutdown Key: KAFKA-14887 URL: https://issues.apache.org/jira/browse/KAFKA-14887 Project: Kafka Issue Type: Improvement Affects Versions: 3.3.2, 3.3.1, 3.2.3, 3.2.2, 3.4.0, 3.2.1, 3.1.2, 3.0.2, 3.3.0, 3.1.1, 3.2.0, 2.8.2, 3.0.1, 3.0.0, 2.8.1, 2.7.2, 3.1.0, 2.7.1, 2.8.0, 2.7.0 Reporter: Ron Dagostino We have the following code in FinalizedFeatureChangeListener.scala which will exit regardless of the type of exception that is thrown when trying to process feature changes: case e: Exception => { error("Failed to process feature ZK node change event. The broker will eventually exit.", e) throw new FatalExitError(1) The issue here is that this does not distinguish between exceptions caused by an inability to process a feature change and an exception caused by a ZooKeeper session timeout. We want to shut the broker down for the former case, but we do NOT want to shut the broker down in the latter case; the ZooKeeper session will eventually be reestablished, and we can continue processing at that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
mjsax commented on PR #10747: URL: https://github.com/apache/kafka/pull/10747#issuecomment-1502280780 Thanks for the quick turn around time! Merged. We can still do a follow up on the open question with regard to the serialization format. Using `Bytes.putInt()` should actually do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
mjsax merged PR #10747: URL: https://github.com/apache/kafka/pull/10747 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
mjsax commented on code in PR #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r1162058460 ## docs/streams/upgrade-guide.html: ## @@ -75,6 +75,20 @@ < changelogs. + +Downgrading from 3.5.x or newer version to 3.4.x or older version needs special attention: +Since 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics. +This means that older versions of Kafka Streams would not be able to recognize the bytes written by newer versions, +and hence it is harder to downgrade Kafka Streams with version 3.5.0 or newer to older versions in-flight. For +more details, please refer to https://cwiki.apache.org/confluence/x/P5VbDg";>KIP-904. + +For a downgrade, first switch the config from "upgrade.from" to the version you are downgrading to. +This disables writing of the new serialization format in your application. It's important to wait in this state +log enough to make sure that the application has finished processing any "in-flight" messages written Review Comment: ```suggestion long enough to make sure that the application has finished processing any "in-flight" messages written ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
mjsax commented on code in PR #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r1162056271 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java: ## @@ -76,6 +79,48 @@ public boolean enableSendingOldValues(final boolean forceMaterialization) { private class KTableMapProcessor extends ContextualProcessor, K1, Change> { +private boolean isNotUpgrade; + +@SuppressWarnings("checkstyle:cyclomaticComplexity") +private boolean isNotUpgrade(final Map configs) { Review Comment: Maybe the inner class would need to be `static`, too? Not sure. Also not too important. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #10747: KAFKA-12446: Call subtractor before adder if key is the same
mjsax commented on code in PR #10747: URL: https://github.com/apache/kafka/pull/10747#discussion_r1162055509 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java: ## @@ -45,34 +49,87 @@ public void setIfUnset(final SerdeGetter getter) { } } +@SuppressWarnings("checkstyle:cyclomaticComplexity") +private boolean isUpgrade(final Map configs) { +final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG); +if (upgradeFrom == null) { +return false; +} + +switch ((String) upgradeFrom) { +case StreamsConfig.UPGRADE_FROM_0100: +case StreamsConfig.UPGRADE_FROM_0101: +case StreamsConfig.UPGRADE_FROM_0102: +case StreamsConfig.UPGRADE_FROM_0110: +case StreamsConfig.UPGRADE_FROM_10: +case StreamsConfig.UPGRADE_FROM_11: +case StreamsConfig.UPGRADE_FROM_20: +case StreamsConfig.UPGRADE_FROM_21: +case StreamsConfig.UPGRADE_FROM_22: +case StreamsConfig.UPGRADE_FROM_23: +case StreamsConfig.UPGRADE_FROM_24: +case StreamsConfig.UPGRADE_FROM_25: +case StreamsConfig.UPGRADE_FROM_26: +case StreamsConfig.UPGRADE_FROM_27: +case StreamsConfig.UPGRADE_FROM_28: +case StreamsConfig.UPGRADE_FROM_30: +case StreamsConfig.UPGRADE_FROM_31: +case StreamsConfig.UPGRADE_FROM_32: +case StreamsConfig.UPGRADE_FROM_33: +case StreamsConfig.UPGRADE_FROM_34: +return true; +default: +return false; +} +} + +@Override +public void configure(final Map configs, final boolean isKey) { +this.isUpgrade = isUpgrade(configs); +} + /** * @throws StreamsException if both old and new values of data are null, or if - * both values are not null + * both values are not null and is upgrading from a version less than 3.4 */ @Override public byte[] serialize(final String topic, final Headers headers, final Change data) { -final byte[] serializedKey; - -// only one of the old / new values would be not null -if (data.newValue != null) { -if (data.oldValue != null) { +final boolean oldValueIsNull = data.oldValue == null; +final boolean newValueIsNull = data.newValue == null; + +final byte[] newData = inner.serialize(topic, headers, data.newValue); +final byte[] oldData = inner.serialize(topic, headers, data.oldValue); + +final int newDataLength = newValueIsNull ? 0 : newData.length; +final int oldDataLength = oldValueIsNull ? 0 : oldData.length; + +// The serialization format is: +// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0} +// {BYTE_ARRAY newValue}{BYTE newOldFlag=1} +// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2} +final ByteBuffer buf; +if (!newValueIsNull && !oldValueIsNull) { +if (isUpgrade) { throw new StreamsException("Both old and new values are not null (" + data.oldValue -+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed."); ++ " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading."); +} else { +final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE; +buf = ByteBuffer.allocate(capacity); +ByteUtils.writeUnsignedInt(buf, newDataLength); Review Comment: I would actually recommend to just remove it from the KIP -- it seem to be an implementation detail. \cc @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162049144 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +nodesToTransactions.synchronized { + // Check if we have already (either node or individual transaction). Add the Node if it isn't there. + val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, +new TransactionDataAndCallbacks( + new AddPartitionsToTxnTransactionCollection(1), + mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) + + val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + + // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and + // reconnected so return the retriable network exception. + if (currentTransactionData != null) { +val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION +val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() +currentTransactionData.topics().forEach { topic => + topic.partitions().forEach { partition => +topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) + } +} +val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) +currentNodeAndTransactionData.transactionData.remove(transactionData) +oldCallback(topicPartitionsToError.toMap) + } + currentNodeAndTransactionData.transactionData.add(transactionData) + currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + wakeup() +} + } + + private class AddPartitionsToTxnHandler(node: Node, transactionDataAndCallbacks: TransactionDataAndCallbacks) extends RequestCompletionHandler { +override def onComplete(response: ClientResponse): Unit = { + // Note: Synchronization is not needed on inflightNodes since it is always accessed from this thread. + inflightNodes.remove(node) + if (response.authenticationException() != null) { +error(s"AddPartitionsToTxnRequest failed for broker ${config.brokerId} with an " + + "authentication exception.", response.authenticationException) +transactionDataAndCallbacks.callbacks.foreach { case (txnId, callback) => + callback(buildErrorMap(txnId, transactionDataAndCallbacks.transactionData, Errors.forException
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162047588 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int, completeShutdown() return +case callback: RequestChannel.CallbackRequest => + try { +val originalRequest = callback.originalRequest + +// If we've already executed a callback for this request, reset the times and subtract the callback time from the +// new dequeue time. This will allow calculation of multiple callback times. +// Otherwise, set dequeue time to now. +if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) { + val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L) + originalRequest.callbackRequestCompleteTimeNanos = None + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos) +} else { + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds()) +} + +currentRequest.set(originalRequest) +callback.fun() +if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty) + originalRequest.callbackRequestCompleteTimeNanos = Some(time.nanoseconds()) + } catch { +case e: FatalExitError => + completeShutdown() + Exit.exit(e.statusCode) +case e: Throwable => error("Exception when handling request", e) + } finally { +currentRequest.remove() + } + case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime trace(s"Kafka request handler $id on broker $brokerId handling request $request") +currentRequest.set(request) apis.handle(request, requestLocal) } catch { case e: FatalExitError => completeShutdown() Exit.exit(e.statusCode) case e: Throwable => error("Exception when handling request", e) } finally { +currentRequest.remove() request.releaseBuffer() } +case RequestChannel.WakeupRequest => // We should handle this in receiveRequest by polling callbackQueue. Review Comment: We can add a warning log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162047387 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int, completeShutdown() return +case callback: RequestChannel.CallbackRequest => + try { +val originalRequest = callback.originalRequest + +// If we've already executed a callback for this request, reset the times and subtract the callback time from the +// new dequeue time. This will allow calculation of multiple callback times. +// Otherwise, set dequeue time to now. +if (originalRequest.callbackRequestDequeueTimeNanos.isDefined) { + val prevCallbacksTimeNanos = originalRequest.callbackRequestCompleteTimeNanos.getOrElse(0L) - originalRequest.callbackRequestDequeueTimeNanos.getOrElse(0L) + originalRequest.callbackRequestCompleteTimeNanos = None + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds() - prevCallbacksTimeNanos) +} else { + originalRequest.callbackRequestDequeueTimeNanos = Some(time.nanoseconds()) +} + +currentRequest.set(originalRequest) +callback.fun() +if (originalRequest.callbackRequestCompleteTimeNanos.isEmpty) Review Comment: It is not true if we returned a response. We also update the value there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162046736 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -69,20 +108,52 @@ class KafkaRequestHandler(id: Int, completeShutdown() return +case callback: RequestChannel.CallbackRequest => + try { +val originalRequest = callback.originalRequest + +// If we've already executed a callback for this request, reset the times and subtract the callback time from the +// new dequeue time. This will allow calculation of multiple callback times. Review Comment: Artem requested this. See comment here. https://github.com/apache/kafka/pull/13391#discussion_r1160849589 There is currently not a way to prevent infinite callbacks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162045551 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -35,6 +36,43 @@ trait ApiRequestHandler { def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit } +object KafkaRequestHandler { + // Support for scheduling callbacks on a request thread. + private val threadRequestChannel = new ThreadLocal[RequestChannel] + private val currentRequest = new ThreadLocal[RequestChannel.Request] + + // For testing + private var bypassThreadCheck = false Review Comment: We can make it volatile, but this is only really used in tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162045206 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +nodesToTransactions.synchronized { + // Check if we have already (either node or individual transaction). Add the Node if it isn't there. + val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, +new TransactionDataAndCallbacks( + new AddPartitionsToTxnTransactionCollection(1), + mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) + + val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + + // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and + // reconnected so return the retriable network exception. + if (currentTransactionData != null) { +val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH +else + Errors.NETWORK_EXCEPTION Review Comment: ^ ditto comment about retriable errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162044614 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -616,66 +619,128 @@ class ReplicaManager(val config: KafkaConfig, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), -requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = { +requestLocal: RequestLocal = RequestLocal.NoCaching, +transactionalId: String = null, +transactionStatePartition: Option[Int] = None): Unit = { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds - val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, -origin, entriesPerPartition, requiredAcks, requestLocal) - debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) - - val produceStatus = localProduceResults.map { case (topicPartition, result) => -topicPartition -> ProducePartitionStatus( - result.info.lastOffset + 1, // required offset - new PartitionResponse( -result.error, -result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), -result.info.logAppendTime, -result.info.logStartOffset, -result.info.recordErrors, -result.info.errorMessage + + val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = +if (transactionStatePartition.isEmpty || !config.transactionPartitionVerificationEnable) + (entriesPerPartition, Map.empty) +else + entriesPerPartition.partition { case (topicPartition, records) => + getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId()) + } + + def appendEntries(allEntries: Map[TopicPartition, MemoryRecords])(unverifiedEntries: Map[TopicPartition, Errors]): Unit = { +val verifiedEntries = + if (unverifiedEntries.isEmpty) +allEntries + else +allEntries.filter { case (tp, _) => + !unverifiedEntries.contains(tp) +} + +val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, + origin, verifiedEntries, requiredAcks, requestLocal) +debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) + +val unverifiedResults = unverifiedEntries.map { case (topicPartition, error) => + // NOTE: Older clients return INVALID_RECORD, but newer clients will return INVALID_TXN_STATE + val message = if (error.equals(Errors.INVALID_RECORD)) "Partition was not added to the transaction" else error.message() + topicPartition -> LogAppendResult( +LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, +Some(error.exception(message)) ) -) // response status - } +} + +val allResults = localProduceResults ++ unverifiedResults + +val produceStatus = allResults.map { case (topicPartition, result) => + topicPartition -> ProducePartitionStatus( +result.info.lastOffset + 1, // required offset +new PartitionResponse( + result.error, + result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), + result.info.logAppendTime, + result.info.logStartOffset, + result.info.recordErrors, + result.info.errorMessage +) + ) // response status +} - actionQueue.add { -() => - localProduceResults.foreach { -case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { -case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) -case LeaderHwChange.NONE => +actionQueue.add { + () => +allResults.foreach { + case (topicPartition, result) => +val requestKey = TopicPartitionOperationKey(topicPartition) +result.info.leaderHwChang
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1162043968 ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -29,7 +29,7 @@ // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 and is still // under developement. Hence, the API is not exposed by default by brokers // unless explicitely enabled. - "latestVersionUnstable": true, + "latestVersionUnstable": false, Review Comment: I will update the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
junrao commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161909242 ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AddPartitionsToTxnRequest, AddPartitionsToTxnResponse} +import org.apache.kafka.common.utils.Time + +import scala.collection.mutable + +object AddPartitionsToTxnManager { + type AppendCallback = Map[TopicPartition, Errors] => Unit +} + + +class TransactionDataAndCallbacks(val transactionData: AddPartitionsToTxnTransactionCollection, + val callbacks: mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]) + + +class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time: Time) + extends InterBrokerSendThread("AddPartitionsToTxnSenderThread-" + config.brokerId, client, config.requestTimeoutMs, time) { + + private val inflightNodes = mutable.HashSet[Node]() + private val nodesToTransactions = mutable.Map[Node, TransactionDataAndCallbacks]() + + def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { +nodesToTransactions.synchronized { Review Comment: Our long term goal is to replace the scala code with java. Could we write this new class and the corresponding test in java? ## clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json: ## @@ -29,7 +29,7 @@ // The AddPartitionsToTxnRequest version 4 API is added as part of KIP-890 and is still // under developement. Hence, the API is not exposed by default by brokers // unless explicitely enabled. - "latestVersionUnstable": true, + "latestVersionUnstable": false, Review Comment: The above comment still says "is still under developement". Is the latest version indeed stable? Or should we change the comment accordingly? ## core/src/main/scala/kafka/network/RequestChannel.scala: ## @@ -354,6 +361,7 @@ class RequestChannel(val queueSize: Int, private val processors = new ConcurrentHashMap[Int, Processor]() val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) + private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize) Review Comment: This seems to be a more general mechanism than ActionQueue. Could we move all existing ActionQueue usage to callback queue and get rid of ActionQueue? This could be done in a separate PR. ## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ## @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestComp
[GitHub] [kafka] sql888 commented on pull request #12006: KAFKA-13794: Follow up to fix comparator
sql888 commented on PR #12006: URL: https://github.com/apache/kafka/pull/12006#issuecomment-1502205546 > Failed tests are unrelated > > ``` > Build / ARM / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() > ``` How to fix this failed test? getting this in M1 ARM Macbook. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] SpacRocket commented on pull request #13382: KAFKA-14722: Make BooleanSerde public
SpacRocket commented on PR #13382: URL: https://github.com/apache/kafka/pull/13382#issuecomment-1502183087 @mjsax Sure thanks for the suggestion. I've pushed a new commit in this pull request: https://github.com/apache/kafka/pull/13491 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710259#comment-17710259 ] Sergei Morozov commented on KAFKA-14750: [~sagarrao] thank you for digging into this and providing the details. I would agree that a solution that will cause an overhead is not the best for this corner case. I'm not well familiar with the Kafka internals, so I don't understand why this issue is reproducible only during a mass topic deletion (which makes it a corner case). As a general idea, if a client fails to retrieve the position, and we know that at this point the topic may no longer exist, could we just refresh the cache on the client side via the admin client and only fail if the topic still exists? > Sink connector fails if a topic matching its topics.regex gets deleted > -- > > Key: KAFKA-14750 > URL: https://issues.apache.org/jira/browse/KAFKA-14750 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.1 >Reporter: Sergei Morozov >Assignee: Sagar Rao >Priority: Major > > Steps to reproduce: > # In {{{}config/connect-standalone.properties{}}}, set: > {code:bash} > plugin.path=libs/connect-file-3.3.1.jar > {code} > # In {{{}config/connect-file-sink.properties{}}}, remove the {{topics=}} line > and add this one: > {code:bash} > topics.regex=connect-test-.* > {code} > # Start zookeeper: > {code:bash} > bin/zookeeper-server-start.sh config/zookeeper.properties > {code} > # Start the brokers: > {code:bash} > bin/kafka-server-start.sh config/server.properties > {code} > # Start the file sink connector: > {code:bash} > bin/connect-standalone.sh config/connect-standalone.properties > config/connect-file-sink.properties > {code} > # Create topics for the sink connector to subscribe to: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --create \ > --topic connect-test-$j > done & > done > wait > {code} > # Wait until all the created topics are assigned to the connector. Check the > number of partitions to be > 0 in the output of: > {code:bash} > bin/kafka-consumer-groups.sh \ > --bootstrap-server localhost:9092 \ > --group connect-local-file-sink \ > --describe --members > {code} > # Delete the created topics: > {code:bash} > for i in {0..2}; do > for j in $(seq $(($i * 100)) $(( ($i + 1) * 100 - 1 ))); do > bin/kafka-topics.sh \ > --bootstrap-server localhost:9092 \ > --delete \ > --topic connect-test-$j > echo Deleted topic connect-test-$j. > done & > done > wait > {code} > # Observe the connector fail with the following error: > {quote}org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before the position for partition connect-test-211-0 could be > determined > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161946467 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## Review Comment: Added a line to the ReplicaManager test to see that we return early on the error in the callback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
guozhangwang commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1161940023 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); -createNewTasks(activeTasksToCreate, standbyTasksToCreate); +final Collection newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); +// If there are any transactions in flight and there are newly created active tasks, commit the tasks +// to avoid potential long restoration times. +if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) { +log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks."); +final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>()); +if (numCommitted == -1) { Review Comment: Hmm.. that was not expected, did you check the trace and find out why the `onAssignment` was not triggered, in which it would set `rebalanceInProgress = true`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
gharris1727 commented on code in PR #13165: URL: https://github.com/apache/kafka/pull/13165#discussion_r1161929533 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath( builder.useParallelExecutor(); Reflections reflections = new InternalReflections(builder); -return new PluginScanResult( -getPluginDesc(reflections, SinkConnector.class, loader), -getPluginDesc(reflections, SourceConnector.class, loader), -getPluginDesc(reflections, Converter.class, loader), -getPluginDesc(reflections, HeaderConverter.class, loader), -getTransformationPluginDesc(loader, reflections), -getPredicatePluginDesc(loader, reflections), -getServiceLoaderPluginDesc(ConfigProvider.class, loader), -getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) -); +ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader); Review Comment: > I don't see a strong reason why it's not [static]. It's non-static to make mocking easier. Rather than having to mock a static method of a class, you mock the Plugins instance, and stub out the loader swapping functionality. It appears that there are only a handful of places where compareAndSwapLoaders (and compareAndSwapWithDelegatingLoader) is used: * In DelegatingClassLoader, during initialization * In AbstractConnectCli and MirrorMaker to swap to the delegating classloader * In EmbeddedConnectCluster to swap back to the saved loader (KAFKA-12229) I think that the EmbeddedConnectCluster call-site is just a result of the open-ended delegating swaps. I'll refactor all of these call-sites to use LoaderSwap, and hide the more dangerous compareAndSwapLoaders now that only LoaderSwap is using it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13524: KIP-848-Interface changes
jeffkbkim commented on code in PR #13524: URL: https://github.com/apache/kafka/pull/13524#discussion_r1161928058 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/common/TopicIdToPartition.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.common; + +import org.apache.kafka.common.Uuid; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +public class TopicIdToPartition { +private final Uuid topicId; +private final Integer partition; +private final Optional> rackIds; + +public TopicIdToPartition(Uuid topicId, Integer topicPartition, Optional> rackIds) { +this.topicId = Objects.requireNonNull(topicId, "topicId can not be null"); +this.partition = Objects.requireNonNull(topicPartition, "topicPartition can not be null"); +this.rackIds = rackIds; Review Comment: this should also be not null. If rackIds is empty, we should pass in Optional.empty() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161896585 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## Review Comment: We do have tests from the previous PR that return errors if the partition is not added to the txn. See https://github.com/apache/kafka/commit/29a1a16668d76a1cc04ec9e39ea13026f2dce1de -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161874495 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -572,6 +572,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, result } + def hasOngoingTransaction(producerId: Long): Boolean = lock synchronized { +val entry = producerStateManager.activeProducers.get(producerId) +entry != null && entry.currentTxnFirstOffset.isPresent Review Comment: This is a Java map, so that doesn't work. I can convert to scala, but not sure that is much better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown
[ https://issues.apache.org/jira/browse/KAFKA-14886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haoze Wu updated KAFKA-14886: - Description: In Kafka-2.8.0, we found that the number of data plane Kafka request handlers may quickly reach the limit when only one request is stuck. As a result, all other requests that require a data plane request handler will be stuck. When there is a slowdown inside the storeOffsets function at line 777 due to I/O operation, the thread holds the lock acquired at line 754. {code:java} private def doCommitOffsets(group: GroupMetadata, memberId: String, groupInstanceId: Option[String], generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { // Line 754 .. groupManager.storeOffsets() // Line 777 .. } } {code} Its call stack is: {code:java} kafka.coordinator.group.GroupMetadata,inLock,227 kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755 kafka.server.KafkaApis,handleOffsetCommitRequest,515 kafka.server.KafkaApis,handle,175 kafka.server.KafkaRequestHandler,run,74 java.lang.Thread,run,748 {code} This happens when the broker is handling the commit offset request from the consumer. When the slowdown mentioned above makes consumers get no response back, the consumer will automatically resend the request to the broker. Note that each request from the consumer is handled by a data-plane-kafka-request-handler thread. Therefore, another data-plane-kafka-request-handler thread will be also stuck at line 754 when handling the retry requests, because it tries to acquire the very same lock of the consumer group. The retry will occur repeatedly, and none of them can succeed. As a result, the pool of data-plane-kafka-request-handler threads will be full. Note that this pool of threads is responsible for handling all such requests from all producers and consumers. As a result, all the producers and consumers would be affected. However, the backoff mechanism might be able to solve this issue, by reducing the number of requests in a short time and reserving more slots in the thread pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if the issue disappears. Specifically, we increase the retry backoff from 100ms (default) to 1000ms in consumer’s config. However, we found that the mentioned thread pool is full again, because there are multiple heartbeat requests that take up the slots of this thread pool. All those heartbeat request handling is stuck when they are acquiring the same consumer group lock, which has been acquired at line 754 as mentioned. Specifically, the heartbeat handling is stuck at GroupCoordinator.handleHeartbeat@624: {code:java} def handleHeartbeat(groupId: String, memberId: String, groupInstanceId: Option[String], generationId: Int, responseCallback: Errors => Unit): Unit = { .. case Some(group) => group.inLock { // Line 624 .. } .. } {code} The heartbeat requests are sent at the interval of 3000ms (by default) from the consumer. It has no backoff mechanism. The thread pool for data-plane-kafka-request-handler will be full soon. Fix: Instead of waiting for the lock, we can just try to acquire the lock (probably with a time limit). If the acquisition fails, this request can be discarded so that other requests (which include the retry of the discarded one) can be processed. However, we feel this fix would affect the semantic of many operations. We would like to hear some suggestions from the community. was: In Kafka-2.8.0, we found that the number of data plane Kafka request handlers may quickly reach the limit when only one request is stuck. As a result, all other requests that require a data plane request handler will be stuck. When there is a slowdown inside the storeOffsets function at line 777 due to I/O operation, the thread holds the lock acquired at line 754. {code:java} private def doCommitOffsets(group: GroupMetadata, memberId: String, groupInstanceId: Option[String], generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { // Line 754 .. groupManager.storeOffsets() // Line 777 .. } } {code} Its call stack is: {code:java} kafka.coordinator.group.GroupMetadata,inLock,227 kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755 kafka.server.KafkaApis
[jira] [Created] (KAFKA-14886) Broker request handler thread pool is full due to single request slowdown
Haoze Wu created KAFKA-14886: Summary: Broker request handler thread pool is full due to single request slowdown Key: KAFKA-14886 URL: https://issues.apache.org/jira/browse/KAFKA-14886 Project: Kafka Issue Type: Improvement Affects Versions: 2.8.0 Reporter: Haoze Wu In Kafka-2.8.0, we found that the number of data plane Kafka request handlers may quickly reach the limit when only one request is stuck. As a result, all other requests that require a data plane request handler will be stuck. When there is a slowdown inside the storeOffsets function at line 777 due to I/O operation, the thread holds the lock acquired at line 754. {code:java} private def doCommitOffsets(group: GroupMetadata, memberId: String, groupInstanceId: Option[String], generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { group.inLock { // Line 754 .. groupManager.storeOffsets() // Line 777 .. } } {code} Its call stack is: {code:java} kafka.coordinator.group.GroupMetadata,inLock,227 kafka.coordinator.group.GroupCoordinator,handleCommitOffsets,755 kafka.server.KafkaApis,handleOffsetCommitRequest,515 kafka.server.KafkaApis,handle,175 kafka.server.KafkaRequestHandler,run,74 java.lang.Thread,run,748 {code} This happens when the broker is handling the commit offset request from the consumer. When the slowdown mentioned above makes consumers get no response back, the consumer will automatically resend the request to the broker. Note that each request from the consumer is handled by a data-plane-kafka-request-handler thread. Therefore, another data-plane-kafka-request-handler thread will be also stuck at line 754 when handling the retry requests, because it tries to acquire the very same lock of the consumer group. The retry will occur repeatedly, and none of them can succeed. As a result, the pool of data-plane-kafka-request-handler threads will be full. Note that this pool of threads is responsible for handling all such requests from all producers and consumers. As a result, all the producers and consumers would be affected. However, the backoff mechanism might be able to solve this issue, by reducing the number of requests in a short time and reserving more slots in the thread pool. Therefore, we increase the backoff config “retry-backoff-ms”, to see if the issue disappears. Specifically, we increase the retry backoff from 100ms (default) to 1000ms in consumer’s config. However, we found that the mentioned thread pool is full again, because there are multiple heartbeat requests that take up the slots of this thread pool. All those heartbeat request handling is stuck when they are acquiring the same consumer group lock, which has been acquired at line 754 as mentioned. Specifically, the heartbeat handling is stuck at GroupCoordinator.handleHeartbeat@624: {code:java} def handleHeartbeat(groupId: String, memberId: String, groupInstanceId: Option[String], generationId: Int, responseCallback: Errors => Unit): Unit = { .. case Some(group) => group.inLock { // Line 624 .. } .. } {code} The heartbeat requests are sent at the interval of 3000ms (by default) from the consumer. It has no backoff mechanism. The thread pool for data-plane-kafka-request-handler will be full soon. Fix: Instead of waiting for the lock, we can just try to acquire the lock (probably with a time limit). If the acquisition fails, this request can be discarded so that other requests (which include the retry of the discarded one) can be processed. However, we feel this fix would affect the semantic of many operations. We would like to hear some suggestions from the community. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161854813 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## Review Comment: Actually hmm -- I suppose this test is not present if you mean the exact path of returning the error and not producing to the log. I really did think I added such a test to replica manger test. I can try to add this path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161847053 ## core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala: ## Review Comment: Depends what you mean here. If you mean a unit test -- yes. If you mean a integration test, no because the correct behavior is built into the producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13391: KAFKA-14561: Improve transactions experience for older clients by ensuring ongoing transaction
jolshan commented on code in PR #13391: URL: https://github.com/apache/kafka/pull/13391#discussion_r1161846085 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -35,6 +36,43 @@ trait ApiRequestHandler { def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit } +object KafkaRequestHandler { + // Support for scheduling callbacks on a request thread. + private val threadRequestChannel = new ThreadLocal[RequestChannel] + private val currentRequest = new ThreadLocal[RequestChannel.Request] + + // For testing + private var bypassThreadCheck = false + def setBypassThreadCheck(bypassCheck: Boolean): Unit = { +bypassThreadCheck = bypassCheck + } + + def currentRequestOnThread(): RequestChannel.Request = { +currentRequest.get() + } + + /** + * Wrap callback to schedule it on a request thread. + * NOTE: this function must be called on a request thread. + * @param fun Callback function to execute + * @return Wrapped callback that would execute `fun` on a request thread + */ + def wrap[T](fun: T => Unit)(request: RequestChannel.Request): T => Unit = { Review Comment: My original concern was that if we just used the thread local, we would access it when the inner method is called. I guess I can just save a local variable when wrap is called and pass that value into the inner method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14420) MirrorMaker should not clear filtered configs on target topics
[ https://issues.apache.org/jira/browse/KAFKA-14420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14420: -- Fix Version/s: 3.5.0 > MirrorMaker should not clear filtered configs on target topics > -- > > Key: KAFKA-14420 > URL: https://issues.apache.org/jira/browse/KAFKA-14420 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.3.1 >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.5.0 > > > If you set additional configurations on a remote topic, MirrorMaker will > clear them when it syncs topic configurations. > The issue is that it also clears topic configurations that are filtered. For > example this prevents running Cruise Control on the target cluster as it may > set follower.replication.throttled.replicas and > leader.replication.throttled.replicas. > MirrorMaker should not clear topic configurations that are filtered on the > target cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14420) MirrorMaker should not clear filtered configs on target topics
[ https://issues.apache.org/jira/browse/KAFKA-14420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710197#comment-17710197 ] Chris Egerton commented on KAFKA-14420: --- Addressed with [KIP-894|https://cwiki.apache.org/confluence/display/KAFKA/KIP-894%3A+Use+incrementalAlterConfigs+API+for+syncing+topic+configurations]. > MirrorMaker should not clear filtered configs on target topics > -- > > Key: KAFKA-14420 > URL: https://issues.apache.org/jira/browse/KAFKA-14420 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.3.1 >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.5.0 > > > If you set additional configurations on a remote topic, MirrorMaker will > clear them when it syncs topic configurations. > The issue is that it also clears topic configurations that are filtered. For > example this prevents running Cruise Control on the target cluster as it may > set follower.replication.throttled.replicas and > leader.replication.throttled.replicas. > MirrorMaker should not clear topic configurations that are filtered on the > target cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13373: Kafka-14420 Use incrementalAlterConfigs API for syncing topic configurations (KIP-894)
C0urante merged PR #13373: URL: https://github.com/apache/kafka/pull/13373 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ppatierno commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
ppatierno commented on code in PR #13525: URL: https://github.com/apache/kafka/pull/13525#discussion_r1161834571 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws IOException { state.initialize(new OffsetAndEpoch(0L, 0)); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); -assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1); -assertEquals("follower", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); Review Comment: what I was referring here was a broker, which is not part of the controller quorum, but it still fetches the metadata topic from the leader controller but it is not part of the voters, because it has "broker" role and not part of the quorum. Isn't it an "observer"? This is also what we get by using the `kafka-metadata-quorum` bin tool which shows the brokers in the observer list. The purpose of the PR was kind of aligning the tool output with the KRaft current state metric which was not reporting a broker as "observer" but as "follower". I am not sure a broker has "resigned" in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ppatierno commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
ppatierno commented on code in PR #13525: URL: https://github.com/apache/kafka/pull/13525#discussion_r1161834571 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws IOException { state.initialize(new OffsetAndEpoch(0L, 0)); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); -assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1); -assertEquals("follower", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); Review Comment: what I was referring here was a broker, which is not part of the controller quorum, but it still fetches the metadata topic from the leader controller but it is not part of the voters, because it has "broker" role and not part of the quorum. Isn't it an "observer"? This is also what we get by using the `kafka-metadata-quorum` bin tool which shows the brokers in the observer list. The purpose of the PR was kind of aligning the tool output with the KRaft current state metric which was not reporting a broker as "observer" but as "follower". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors
C0urante commented on PR #13424: URL: https://github.com/apache/kafka/pull/13424#issuecomment-1501942587 @mimaison thanks for the reviews. Do you think you'll have time to take another look at this before the April 12th feature freeze? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13424: KAFKA-14783 (KIP-875): New STOPPED state for connectors
C0urante commented on code in PR #13424: URL: https://github.com/apache/kafka/pull/13424#discussion_r1161808636 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1090,6 +1090,39 @@ public void putConnectorConfig(final String connName, final Map ); } +@Override +public void stopConnector(final String connName, final Callback callback) { +log.trace("Submitting request to transition connector {} to STOPPED state", connName); + +addRequest( +() -> { +refreshConfigSnapshot(workerSyncTimeoutMs); +if (!configState.contains(connName)) +throw new NotFoundException("Unknown connector " + connName); + +// We only allow the leader to handle this request since it involves writing task configs to the config topic +if (!isLeader()) { +callback.onCompletion(new NotLeaderException("Only the leader can transition connectors to the STOPPED state.", leaderUrl()), null); +return null; +} + +// TODO: We may want to add a new ConfigBackingStore method for stopping a connector so that Review Comment: I guess this wasn't really a `TODO` so much as a `MAYBE-DO`. I've removed it since on second thought it's a `DON'T-DO-FOR-NOW` as there is little if any benefit from that change, except possibly blocking the herder thread for less time if we're having trouble writing to the config topic. There's also the transactional producer logic that Yash mentioned, but since it's not guaranteed that we'll have access to one (if exactly-once source support is disabled), that's not worth considering right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13453: KAFKA-12525: Ignoring Stale status statuses when reading from Status …
C0urante commented on PR #13453: URL: https://github.com/apache/kafka/pull/13453#issuecomment-1501923378 @vamossagar12 I think this approach is a bit too broad. We intentionally permit "unsafe" writes for reasons documented in the [AbstractHerder](https://github.com/apache/kafka/blob/17435484e4c49eef440ee412a711a88fed08bf50/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L90-L109) and [KafkaStatusBackingStore](https://github.com/apache/kafka/blob/17435484e4c49eef440ee412a711a88fed08bf50/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java#L74-L87) Javadocs. Specifically: > this prevents us from depending on the generation absolutely. If the group disappears and the generation is reset, then we'll overwrite the status information with the older (and larger) generation with the updated one. The danger of this approach is that slow starting tasks may cause the status to be overwritten after a rebalance has completed. I have a few alternatives in mind for how we might address this, but haven't fully thought any of them through yet since there are several KIP PRs that need to be reviewed right now which I'm giving priority to. To give me some idea of how highly to prioritize this once those are taken care of, can you let me know if this issue is actively affecting you or someone you know, or if the PR is a way to get up-to-speed with Kafka Connect, or something else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #13466: MINOR: Fix base ConfigDef in AbstractHerder::connectorPluginConfig
C0urante commented on PR #13466: URL: https://github.com/apache/kafka/pull/13466#issuecomment-1501875562 Thanks Mickael! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #13525: KAFKA-14883: Expose `observer` state in KRaft metrics
dengziming commented on code in PR #13525: URL: https://github.com/apache/kafka/pull/13525#discussion_r1161721906 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -137,14 +137,14 @@ public void shouldRecordNonVoterQuorumState() throws IOException { state.initialize(new OffsetAndEpoch(0L, 0)); raftMetrics = new KafkaRaftMetrics(metrics, "raft", state); -assertEquals("unattached", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-leader").metricValue()); assertEquals((double) -1L, getMetric(metrics, "current-vote").metricValue()); assertEquals((double) 0, getMetric(metrics, "current-epoch").metricValue()); assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); state.transitionToFollower(2, 1); -assertEquals("follower", getMetric(metrics, "current-state").metricValue()); +assertEquals("observer", getMetric(metrics, "current-state").metricValue()); Review Comment: In our code design, observer/voter are the state of follower, leader/candidate/voted/follower/unattached/Resigned are the state of raft node state machine, so IMO, we'd better add "resigned" state instead of "observer" state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager
showuon commented on code in PR #13487: URL: https://github.com/apache/kafka/pull/13487#discussion_r1161677863 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -0,0 +1,736 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.cluster.Partition; +import kafka.log.LogSegment; +import kafka.log.UnifiedLog; +import kafka.server.KafkaConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RemoteLogInputStream; +import org.apache.kafka.common.utils.ChildFirstClassLoader; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments + * - copying log segments to remote storage + */ +public class RemoteLogManager implements Closeable { + +private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.c
[GitHub] [kafka] satishd commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager
satishd commented on code in PR #13487: URL: https://github.com/apache/kafka/pull/13487#discussion_r1161670474 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -0,0 +1,749 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.cluster.Partition; +import kafka.log.LogSegment; +import kafka.log.UnifiedLog; +import kafka.server.KafkaConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RemoteLogInputStream; +import org.apache.kafka.common.utils.ChildFirstClassLoader; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments + * - copying log segments to remote storage + */ +public class RemoteLogManager implements Closeable { + +private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.c
[GitHub] [kafka] satishd commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager
satishd commented on code in PR #13487: URL: https://github.com/apache/kafka/pull/13487#discussion_r1161286998 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -0,0 +1,749 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote; + +import kafka.cluster.Partition; +import kafka.log.LogSegment; +import kafka.log.UnifiedLog; +import kafka.server.KafkaConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RemoteLogInputStream; +import org.apache.kafka.common.utils.ChildFirstClassLoader; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.LogSegmentData; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.EpochEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments + * - copying log segments to remote storage + */ +public class RemoteLogManager implements Closeable { + +private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.c
[GitHub] [kafka] Stephan14 opened a new pull request, #13531: KAFKA-14885: fix kafka client connect to the broker that offline from…
Stephan14 opened a new pull request, #13531: URL: https://github.com/apache/kafka/pull/13531 … cluster *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down
hudeqi commented on PR #13473: URL: https://github.com/apache/kafka/pull/13473#issuecomment-1501643812 > Do other committers have time to review this PR? @guozhangwang Hello, can you help to review these two PRs? this and https://github.com/apache/kafka/pull/13471 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader
mukkachaitanya commented on code in PR #13165: URL: https://github.com/apache/kafka/pull/13165#discussion_r1161593044 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java: ## @@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath( builder.useParallelExecutor(); Reflections reflections = new InternalReflections(builder); -return new PluginScanResult( -getPluginDesc(reflections, SinkConnector.class, loader), -getPluginDesc(reflections, SourceConnector.class, loader), -getPluginDesc(reflections, Converter.class, loader), -getPluginDesc(reflections, HeaderConverter.class, loader), -getTransformationPluginDesc(loader, reflections), -getPredicatePluginDesc(loader, reflections), -getServiceLoaderPluginDesc(ConfigProvider.class, loader), -getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) -); +ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader); Review Comment: `Plugins#withClassLoader` would be a nice touch to do these repetitive swaps; however, it doesn't seem to be a static method. I don't see a strong reason why it's not. If it's not too much out of the scope of this PR, can we make it static and use it to make the code cleaner? Alternatively, if there are several instances where the code benefits form using the new static method we can tackle it with another refactor PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-14885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zou shengfu updated KAFKA-14885: Description: When a broker has some issues about network, the broker can not connect to zookeeper and controller.At this time, we replace this broker with a new broker that has a same `broker.id` with fault broker, and we can not stop the Kafka process on fault broker because of network issue. So the client can still connect this broker and can produce and consume messages normally. But the data on fault broker maybe be lost because there are some leader for partitions on the fault broker. Do we have any good idea to solve this problem? In my opinion, we can check broker configuration (for example: broker ip) when broker reconnects to zookeeper and broker can exist if broker's configuration is not same with zookeeper. But if broker can not reconnect to zookeeper successfully, maybe we need to check broker configuration between local disk and zookeeper periodically was: When a broker has some issues about network, the broker can not connect to zookeeper and controller.At this time, we replace this broker with a new broker that has a same `broker.id` with fault broker, and we can not stop the Kafka process on fault broker because of network issue. So the client can still connect this broker and can produce and consume messages normally. But the data on fault broker maybe be lost because there are some leader for partitions on the fault broker. Do we have any good idea to solve this problem? > Client can connect to broker and broker can not connect zookeeper > - > > Key: KAFKA-14885 > URL: https://issues.apache.org/jira/browse/KAFKA-14885 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: zou shengfu >Assignee: zou shengfu >Priority: Major > > When a broker has some issues about network, the broker can not connect to > zookeeper and controller.At this time, we replace this broker with a new > broker that has a same `broker.id` with fault broker, and we can not stop > the Kafka process on fault broker because of network issue. So the client can > still connect this broker and can produce and consume messages normally. But > the data on fault broker maybe be lost because there are some leader for > partitions on the fault broker. > Do we have any good idea to solve this problem? > In my opinion, we can check broker configuration (for example: broker ip) > when broker reconnects to zookeeper and broker can exist if broker's > configuration is not same with zookeeper. But if broker can not reconnect to > zookeeper successfully, maybe we need to check broker configuration between > local disk and zookeeper periodically > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper
[ https://issues.apache.org/jira/browse/KAFKA-14885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zou shengfu updated KAFKA-14885: Description: When a broker has some issues about network, the broker can not connect to zookeeper and controller.At this time, we replace this broker with a new broker that has a same `broker.id` with fault broker, and we can not stop the Kafka process on fault broker because of network issue. So the client can still connect this broker and can produce and consume messages normally. But the data on fault broker maybe be lost because there are some leader for partitions on the fault broker. Do we have any good idea to solve this problem? > Client can connect to broker and broker can not connect zookeeper > - > > Key: KAFKA-14885 > URL: https://issues.apache.org/jira/browse/KAFKA-14885 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.2 >Reporter: zou shengfu >Assignee: zou shengfu >Priority: Major > > When a broker has some issues about network, the broker can not connect to > zookeeper and controller.At this time, we replace this broker with a new > broker that has a same `broker.id` with fault broker, and we can not stop > the Kafka process on fault broker because of network issue. So the client can > still connect this broker and can produce and consume messages normally. But > the data on fault broker maybe be lost because there are some leader for > partitions on the fault broker. > Do we have any good idea to solve this problem? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14581) Move GetOffsetShell to tools
[ https://issues.apache.org/jira/browse/KAFKA-14581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708011#comment-17708011 ] Ruslan Krivoshein edited comment on KAFKA-14581 at 4/10/23 8:12 AM: Let it be booked for me, please. UPD: I almost finished it basing on an unaccepted PR for blocking task (it looks completed). was (Author: krivosheinruslan): Let it be booked for me, please > Move GetOffsetShell to tools > > > Key: KAFKA-14581 > URL: https://issues.apache.org/jira/browse/KAFKA-14581 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14885) Client can connect to broker and broker can not connect zookeeper
zou shengfu created KAFKA-14885: --- Summary: Client can connect to broker and broker can not connect zookeeper Key: KAFKA-14885 URL: https://issues.apache.org/jira/browse/KAFKA-14885 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.3.2 Reporter: zou shengfu Assignee: zou shengfu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14873) Pluggable storage for Kafka Connect internal topics
[ https://issues.apache.org/jira/browse/KAFKA-14873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710078#comment-17710078 ] Sagar Rao commented on KAFKA-14873: --- Hi [~malthe] , thanks for reporting this. I think your point makes sense in general terms. That's why, if you see all the3 backing stores are implemented as interfaces for example => `OffsetBackingStore`, `StatusBackingStore` and `ConfigBackingStore`. The implementations that we see for Kafka are just implementors of these interfaces. Typically, there is a `MemoryBased` equivalent to these backing stores which IIUC is used in standalone mode. Do you have any KV based backing stores in mind that you would want to see being added? I see Azure Table Storage has been cited as an example in OP. If you think something like this is needed, then it would need to go through a KIP process since a new public interface is being added. Let me know if you want to take it further via a KIP. > Pluggable storage for Kafka Connect internal topics > --- > > Key: KAFKA-14873 > URL: https://issues.apache.org/jira/browse/KAFKA-14873 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Malthe Borch >Priority: Major > Labels: needs-kip > > The Kafka Connect framework relies on compacted topics to store config, > offset and status information for each connector. > This conflates two kinds of data, control and content, which some people > disagree with. Notably, [Azure Event > Hub|https://learn.microsoft.com/en-us/azure/event-hubs/log-compaction] does > not (or _did not_, because there's currently a preview release out which does > have support for compacted topics albeit only at the more expensive premium > tiers). > In some deployments, it may be desirable to use a different backend for these > control settings (which essentially take a key/value form), for example > [Azure Table > Storage|https://learn.microsoft.com/en-us/rest/api/storageservices/table-service-rest-api] > – basically any key/value store that provides the Write-If-Matches primitive > to update a key only if the current value matches a known value. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14750) Sink connector fails if a topic matching its topics.regex gets deleted
[ https://issues.apache.org/jira/browse/KAFKA-14750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17710077#comment-17710077 ] Sagar Rao commented on KAFKA-14750: --- I ran a few more tests for this. This issue is found specifically when a mass delete of topics is issued. Typically, if a few topics is deleted ( I tested with topics ranging from 1-10), the `position` API doesn't fail. It says the following ``` [2023-04-10 12:19:33,115] INFO [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Resetting offset for partition connect-test-3010-0 to position FetchPosition\{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:399) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-300-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3002-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3003-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3004-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3005-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3006-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3007-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3008-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3009-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,115] DEBUG [local-file-sink-300|task-0] WorkerSinkTask\{id=local-file-sink-300-0} Assigned topic partition connect-test-3010-0 with offset 0 (org.apache.kafka.connect.runtime.WorkerSinkTask:700) [2023-04-10 12:19:33,660] WARN [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Received unknown topic or partition error in fetch for partition connect-test-3002-0 (org.apache.kafka.clients.consumer.internals.Fetcher:1340) [2023-04-10 12:19:33,661] INFO [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Request joining group due to: cached metadata has changed from (version362: \{connect-test-3007=1, connect-test-3008=1, connect-test-3009=1, connect-test-3003=1, connect-test-300=1, connect-test-3004=1, connect-test-3005=1, connect-test-3006=1, connect-test-3010=1, connect-test-3002=1}) at the beginning of the rebalance to (version363: \{connect-test-3007=1, connect-test-3008=1, connect-test-3009=1, connect-test-3003=1, connect-test-300=1, connect-test-3004=1, connect-test-3005=1, connect-test-3006=1, connect-test-3010=1}) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1066) [2023-04-10 12:19:34,165] INFO [local-file-sink-300|task-0] [Consumer clientId=connector-consumer-local-file-sink-300-0, groupId=connect-local-file-sink-300] Revoke previously assigned partitions connect-test-300-0, connect-test-3002-0, connect-test-3003-0, connect-test-3004-0, connect-test-3005-0, connect-test-3006-0, connect-test-3007-0, connect-test-3008-0, connect-test-3009-0, connect-test-3010-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:331) ``` Notice the second `Request joining group due to: cached metadata has changed from` after Received unknown topic or partition error in fetch for partition In the sort of test that was executed above, the line ` Received unknown topic or partition error in fetch for partition is not foll