Re: [PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]
divijvaidya commented on PR #14788: URL: https://github.com/apache/kafka/pull/14788#issuecomment-1817397316 This is a public facing change. Is this part of a KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
philipnee commented on PR #14680: URL: https://github.com/apache/kafka/pull/14680#issuecomment-1817393641 Thanks @lucasbru -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on PR #14570: URL: https://github.com/apache/kafka/pull/14570#issuecomment-1817349035 @mjsax I have already updated the javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
jolshan commented on code in PR #14789: URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() { assertEquals(5, recordsToTest.get(1).offset()); } +/** + * KAFKA-15836: + * Test that max.poll.records is honoured when consuming from multiple topic-partitions and the + * fetched records are not aligned on max.poll.records boundaries. + * + * tp0 has records 1,2,3; tp1 has records 6,7,8 + * max.poll.records is 2 + * + * poll 1 should return 1,2 + * poll 2 should return 3,6 + * poll 3 should return 7,8 + * + * Or similar :) + */ +@Test +public void testFetchMaxPollRecordsUnaligned() { +buildFetcher(2); + +Set tps = new HashSet<>(); +tps.add(tp0); +tps.add(tp1); +assignFromUser(tps); +subscriptions.seek(tp0, 1); +subscriptions.seek(tp1, 6); + +client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, moreRecords, 100L)); +client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, Errors.NONE, 100L, 0)); + +assertEquals(1, sendFetches()); +consumerClient.poll(time.timer(0)); Review Comment: It's a little confusing to reuse the recordsByPartition, fetchedRecords values on each call. I wonder if there is a way to modularize or make it clearer things are being reset and avoid code duplication. I'm also wondering if we could have comments on each of these cases for what we expect to be happening. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
jolshan commented on code in PR #14789: URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() { assertEquals(5, recordsToTest.get(1).offset()); } +/** + * KAFKA-15836: + * Test that max.poll.records is honoured when consuming from multiple topic-partitions and the + * fetched records are not aligned on max.poll.records boundaries. + * + * tp0 has records 1,2,3; tp1 has records 6,7,8 + * max.poll.records is 2 + * + * poll 1 should return 1,2 + * poll 2 should return 3,6 + * poll 3 should return 7,8 + * + * Or similar :) + */ +@Test +public void testFetchMaxPollRecordsUnaligned() { +buildFetcher(2); + +Set tps = new HashSet<>(); +tps.add(tp0); +tps.add(tp1); +assignFromUser(tps); +subscriptions.seek(tp0, 1); +subscriptions.seek(tp1, 6); + +client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, moreRecords, 100L)); +client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, Errors.NONE, 100L, 0)); + +assertEquals(1, sendFetches()); +consumerClient.poll(time.timer(0)); Review Comment: It's a little confusing to reuse the recordsByPartition, fetchedRecords values on each call. I wonder if there is a way to modularize or make it clearer things are being reset. I'm also wondering if we could have comments on each of these cases for what we expect to be happening. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
jolshan commented on code in PR #14789: URL: https://github.com/apache/kafka/pull/14789#discussion_r1398026217 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() { assertEquals(5, recordsToTest.get(1).offset()); } +/** + * KAFKA-15836: + * Test that max.poll.records is honoured when consuming from multiple topic-partitions and the + * fetched records are not aligned on max.poll.records boundaries. + * + * tp0 has records 1,2,3; tp1 has records 6,7,8 + * max.poll.records is 2 + * + * poll 1 should return 1,2 + * poll 2 should return 3,6 + * poll 3 should return 7,8 + * + * Or similar :) + */ +@Test +public void testFetchMaxPollRecordsUnaligned() { +buildFetcher(2); + +Set tps = new HashSet<>(); +tps.add(tp0); +tps.add(tp1); +assignFromUser(tps); +subscriptions.seek(tp0, 1); +subscriptions.seek(tp1, 6); + +client.prepareResponse(fetchResponse2(tidp0, records, 100L, tidp1, moreRecords, 100L)); +client.prepareResponse(fullFetchResponse(tidp0, emptyRecords, Errors.NONE, 100L, 0)); + +assertEquals(1, sendFetches()); +consumerClient.poll(time.timer(0)); Review Comment: It's a little confusing to reuse the recordsByPartition, fetchedRecords values on each call. I wonder if there is a way to modularize or make it clearer things are being reset. I'm also wondering if we could have comments on each of these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Change test logging capture to per-test, reducing jenkins truncation [kafka]
gharris1727 opened a new pull request, #14795: URL: https://github.com/apache/kafka/pull/14795 Jenkins truncates stdout/stderr from tests which exceed 100,000 bytes. This truncation is computed once per-suite, meaning that each suite gets a 100kb budget for logs, and suites that log too much have the middle of the log truncated. This unfairly discards complete logs for tests in the middle of the suite, while keeping logs from the beginning and end of the suite. If a failure occurs in a single test in the middle of a suite, the relevant logs may be completely elided, making investigation of the failure more difficult. This has made debugging with the CI logging almost completely ineffective, as the relevant logs are often swallowed by Jenkins, and irrelevant logs are shown. Instead, we can enable this feature in the Gradle JunitXmlReport: https://docs.gradle.org/current/javadoc/org/gradle/api/tasks/testing/JUnitXmlReport.html#setOutputPerTestCase-boolean- This changes the way that stdout/stderr is embedded in the XML report, separating the output for each test into different xml tags. This may enable Jenkins to perform truncation on a per-test basis, so that each test in a suite gets a fair distribution of the logging budget. This could increase the size of the logs persisted by Jenkins, as each suite is currently capped at 100kb, but after this change you could receive N*100kb logs overall, if there are N tests in the suite. However It appears that Jenkins cannot show the stdout for passing tests, so probably isn't capturing it (I couldn't find a configuration which would confirm this.) If this is true, that means that the size of logs persisted will only increase for test failures, when the additional logs would be useful. This change may also reduce the total amount of logs captured, since logs from tests that passed won't be kept when another test in the same suite fails. Regardless, the more effective usage of the logging budget will be beneficial even if the total amount of logs persisted increases. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398012157 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; +import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; + +import java.time.Duration; +import java.util.Map; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; +import org.apache.kafka.streams.state.internals.ListValueStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde; + +public class OuterStreamJoinStoreFactory extends AbstractConfigurableStoreFactory { + +private final String name; +private final StreamJoinedInternal streamJoined; +private final JoinWindows windows; +private boolean loggingEnabled; + +public enum Type { +RIGHT, +LEFT +} + +public OuterStreamJoinStoreFactory( +final String name, +final StreamJoinedInternal streamJoined, +final JoinWindows windows, +final Type type +) { +super(streamJoined.dslStoreSuppliers()); +this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + "-store"; +this.streamJoined = streamJoined; +this.windows = windows; +this.loggingEnabled = streamJoined.loggingEnabled(); +} + +@Override +public StateStore build() { +final Duration retentionPeriod = Duration.ofMillis(retentionPeriod()); +final Duration windowSize = Duration.ofMillis(windows.size()); +final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); +final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); +final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); +final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); + +if (retentionMs < 0L) { +throw new IllegalArgumentException("retentionPeriod cannot be negative"); +} +if (windowSizeMs < 0L) { +throw new IllegalArgumentException("windowSize cannot be negative"); +} +if (windowSizeMs > retentionMs) { +throw new IllegalArgumentException("The retention period of the window store " ++ name + " must be no smaller than its window size. Got size=[" ++ windowSizeMs + "], retention=[" + retentionMs + "]"); +} + +final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde()); +final LeftOrRightValueSerde leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde()); + +// TODO: we should allow for configuration of this store explicitly instead of assuming that it should +// share the same type of store as thisStoreSupplier +final boolean useInMemoryStore = streamJoined.thisStoreSupplier() != null +&& streamJoined.thisStoreSupplier() instanceof InMemoryWindowBytesStoreSupplier; +final KeyValueBytesStoreSupplier supplier = useInMe
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398006947 ## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ## @@ -1038,13 +1038,15 @@ public void shouldThrowConfigExceptionWhenOptimizationConfigNotValueInRange() { assertThrows(ConfigException.class, () -> new StreamsConfig(props)); } +@SuppressWarnings("deprecation") @Test public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() { final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB; final String actualDefaultStoreType = streamsConfig.getString(DEFAULT_DSL_STORE_CONFIG); assertEquals("default.dsl.store should be \"rocksDB\"", expectedDefaultStoreType, actualDefaultStoreType); } +@SuppressWarnings("deprecation") @Test public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() { Review Comment: Can you add some basic tests for the new store config? Basically something like what we have for the old configs -- ie the default is rocksdb and that the built-in InMemoryDslStoreSuppliers -- but instead of the bad-enum one, just make sure you can actually pass in a custom implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398005349 ## streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.kstream.EmitStrategy; + +/** + * {@code DslWindowParams} is a wrapper class for all parameters that function + * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}. + */ +public class DslWindowParams { + +private final String name; +private final Duration retentionPeriod; +private final Duration windowSize; +private final boolean retainDuplicates; +private final EmitStrategy emitStrategy; +private final boolean isSlidingWindow; + +/** + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * (note that the retention period must be at least long enough to contain the + * windowed data's entire life cycle, from window-start through window-end, + * and for the entire grace period) Review Comment: Actually IIUC users should never have to instantiate these classes directly, so I guess it's not that critical to define the specific constraints and ok to describe it conceptually. Or both 🤷♀️ ## streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.kstream.EmitStrategy; + +/** + * {@code DslWindowParams} is a wrapper class for all parameters that function + * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}. + */ +public class DslWindowParams { + +private final String name; +private final Duration retentionPeriod; +private final Duration windowSize; +private final boolean retainDuplicates; +private final EmitStrategy emitStrategy; +private final boolean isSlidingWindow; + +/** + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * (note that the retention period must be at least long enough to contain the + * windowed data's entire life cycle, from window-start through window-end, + * and for the entire grace period) Review Comment: Actually IIUC users should never have to instantiate these classes directly, so I guess it's not that critical to define the specific constraints and ok to describe it conceptually. Or both 🤷♀️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398004582 ## streams/src/main/java/org/apache/kafka/streams/state/DslWindowParams.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.kstream.EmitStrategy; + +/** + * {@code DslWindowParams} is a wrapper class for all parameters that function + * as inputs to {@link DslStoreSuppliers#windowStore(DslWindowParams)}. + */ +public class DslWindowParams { + +private final String name; +private final Duration retentionPeriod; +private final Duration windowSize; +private final boolean retainDuplicates; +private final EmitStrategy emitStrategy; +private final boolean isSlidingWindow; + +/** + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * (note that the retention period must be at least long enough to contain the + * windowed data's entire life cycle, from window-start through window-end, + * and for the entire grace period) Review Comment: this is kind of a roundabout way of saying it must be at least equal to the windowSize + gracePeriod. I think it's better to just explicitly state this and put the constraint in concrete terms relative to other known quantities -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398003882 ## streams/src/main/java/org/apache/kafka/streams/state/DslStoreSuppliers.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.util.Map; +import org.apache.kafka.common.Configurable; + +/** + * {@code DslStoreSuppliers} defines a grouping of factories to construct + * stores for each of the types of state store implementations in Kafka + * Streams. This allows configuration of a default store supplier beyond + * the builtin defaults of RocksDB and In-Memory. Review Comment: I think we should be a bit more explicit about how this gets used, see https://github.com/apache/kafka/pull/14648/files#r1397992729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398003330 ## streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java: ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.kstream.EmitStrategy; + +/** + * {@code DslSessionParams} is a wrapper class for all parameters that function + * as inputs to {@link DslStoreSuppliers#sessionStore(DslSessionParams)}. + */ +public class DslSessionParams { + +private final String name; +private final Duration retentionPeriod; +private final EmitStrategy emitStrategy; + +/** + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * (note that the retention period must be at least as long enough to + * contain the inactivity gap of the session and the entire grace period.) + * @param emitStrategy defines how to emit results + */ +public DslSessionParams( +final String name, +final Duration retentionPeriod, +final EmitStrategy emitStrategy +) { +Objects.requireNonNull(name); +this.name = name; +this.retentionPeriod = retentionPeriod; +this.emitStrategy = emitStrategy; +} + +public String name() { +return name; +} + +public Duration retentionPeriod() { +return retentionPeriod; +} + +public EmitStrategy emitStrategy() { +return emitStrategy; +} + +@Override +public boolean equals(final Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +final DslSessionParams that = (DslSessionParams) o; +return Objects.equals(name, that.name) +&& Objects.equals(retentionPeriod, that.retentionPeriod) +&& Objects.equals(emitStrategy, that.emitStrategy); +} + +@Override +public int hashCode() { +return Objects.hash(name, retentionPeriod, emitStrategy); +} + +@Override +public String toString() { +return "DslSessionParams{" + +"name='" + name + '\'' + Review Comment: Actually looks like we have it in the KV and Window params as well. If it's intentional it should probably be on every line, but I'm guessing it's not? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398002738 ## streams/src/main/java/org/apache/kafka/streams/state/DslSessionParams.java: ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import java.time.Duration; +import java.util.Objects; +import org.apache.kafka.streams.kstream.EmitStrategy; + +/** + * {@code DslSessionParams} is a wrapper class for all parameters that function + * as inputs to {@link DslStoreSuppliers#sessionStore(DslSessionParams)}. + */ +public class DslSessionParams { + +private final String name; +private final Duration retentionPeriod; +private final EmitStrategy emitStrategy; + +/** + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * (note that the retention period must be at least as long enough to + * contain the inactivity gap of the session and the entire grace period.) + * @param emitStrategy defines how to emit results + */ +public DslSessionParams( +final String name, +final Duration retentionPeriod, +final EmitStrategy emitStrategy +) { +Objects.requireNonNull(name); +this.name = name; +this.retentionPeriod = retentionPeriod; +this.emitStrategy = emitStrategy; +} + +public String name() { +return name; +} + +public Duration retentionPeriod() { +return retentionPeriod; +} + +public EmitStrategy emitStrategy() { +return emitStrategy; +} + +@Override +public boolean equals(final Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +final DslSessionParams that = (DslSessionParams) o; +return Objects.equals(name, that.name) +&& Objects.equals(retentionPeriod, that.retentionPeriod) +&& Objects.equals(emitStrategy, that.emitStrategy); +} + +@Override +public int hashCode() { +return Objects.hash(name, retentionPeriod, emitStrategy); +} + +@Override +public String toString() { +return "DslSessionParams{" + +"name='" + name + '\'' + Review Comment: what's with the `+ '\''` (guessing a copy/paste error?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1398002400 ## streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; + +public class BuiltInDslStoreSuppliers { Review Comment: Actually maybe the stuff I listed above should be in the `DslStoreSuppliers` interface javadocs instead of here. But I still think this class should have javadocs, even just a sentence or two -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1397992729 ## streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; + +public class BuiltInDslStoreSuppliers { Review Comment: I think we should probably have javadocs for this class and the two built-in implementations. They can be brief but should cover at least how/where to use them: 1. how to configure an app-wide global default (including the config name) 2. how to configure individual operators via Materialized and StreamJoined 3. the hierarchy of preference if specified in multiple places (ie StreamJoined/Materialized overrides configured global default overrides unconfigured Streams default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1397987652 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.java: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.DslWindowParams; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +public class StreamJoinedStoreFactory extends AbstractConfigurableStoreFactory { + +private final String name; +private final JoinWindows windows; +private final Serde valueSerde; +private final WindowBytesStoreSupplier storeSupplier; +private final StreamJoinedInternal joinedInternal; + +private boolean loggingEnabled; +private final Map logConfig; + +public enum Type { +THIS, +OTHER +} + +public StreamJoinedStoreFactory( +final String name, +final JoinWindows windows, +final StreamJoinedInternal joinedInternal, +final Type type +) { +super(joinedInternal.dslStoreSuppliers()); +this.name = name + "-store"; +this.joinedInternal = joinedInternal; +this.windows = windows; +this.loggingEnabled = joinedInternal.loggingEnabled(); +this.logConfig = new HashMap<>(joinedInternal.logConfig()); + +// since this store is configured to retain duplicates, we should +// not compact, so we override the configuration to make sure that +// it's just delete (window stores are configured to compact,delete) +this.logConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); Review Comment: I guess it would be nice to have a test for this logic but honestly that could probably be saved for the followup work mentioned in that TODO which would be a more involved refactoring of the changelog topic configuration. (Haven't gotten to the tests yet so maybe you already did this) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15215: [KIP-954] support custom DSL store providers [kafka]
ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1397982732 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java: ## @@ -66,6 +66,10 @@ default void configure(final StreamsConfig config) { boolean isVersionedStore(); +// TODO: consider moving all the log configuration code (InternalTopicConfig) Review Comment: this would be a great cleanup, I'm definitely not a fan of the `#isWindowStore` and `retentionPeriod` logic which kind of breaks the abstraction. Just wondering, would we also be able to get rid of the `#historyRetention` and `#isVersionedStore` or is that used for something else? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java: ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; +import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; + +import java.time.Duration; +import java.util.Map; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.InMemoryWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.LeftOrRightValue; +import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; +import org.apache.kafka.streams.state.internals.ListValueStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; +import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSideSerde; + +public class OuterStreamJoinStoreFactory extends AbstractConfigurableStoreFactory { + +private final String name; +private final StreamJoinedInternal streamJoined; +private final JoinWindows windows; +private boolean loggingEnabled; + +public enum Type { +RIGHT, +LEFT +} + +public OuterStreamJoinStoreFactory( +final String name, +final StreamJoinedInternal streamJoined, +final JoinWindows windows, +final Type type +) { +super(streamJoined.dslStoreSuppliers()); +this.name = buildOuterJoinWindowStoreName(streamJoined, name, type) + "-store"; +this.streamJoined = streamJoined; +this.windows = windows; +this.loggingEnabled = streamJoined.loggingEnabled(); +} + +@Override +public StateStore build() { +final Duration retentionPeriod = Duration.ofMillis(retentionPeriod()); +final Duration windowSize = Duration.ofMillis(windows.size()); +final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); +final long retentionMs = validateMillisecondDuration(retentionPeriod, rpMsgPrefix); +final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); +final long windowSizeMs = validateMillisecondDuration(windowSize, wsMsgPrefix); + +if (retentionMs < 0L) { +throw new IllegalArgumentException("retentionPeriod cannot be negative"); +} +if (windowSizeMs < 0L) { +throw new IllegalArgumentException("windowSize cannot be negative"); +} +if (windowSizeMs > retentionMs) { +throw new IllegalArgumentException("The retention period of the window store " ++ name + " must be no smaller than its window size. Got size=[" ++ windowSizeMs + "], retention=[" + retentionMs + "]"); +} + +final TimestampedKeyAndJoinSideSerde timestampedKeyAndJoinSideSerde = new TimestampedKe
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
xvrl commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1397923196 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value, but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg: + * org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be + * created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. + * + * + * + * Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the same way as Frequencies. The only difference is that it is compose
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397751986 ## core/src/main/java/kafka/server/ClientMetricsManager.java: ## @@ -30,17 +69,376 @@ public class ClientMetricsManager implements Closeable { private static final Logger log = LoggerFactory.getLogger(ClientMetricsManager.class); private static final ClientMetricsManager INSTANCE = new ClientMetricsManager(); +private static final List SUPPORTED_COMPRESSION_TYPES = Collections.unmodifiableList( +Arrays.asList(CompressionType.ZSTD.id, CompressionType.LZ4.id, CompressionType.GZIP.id, CompressionType.SNAPPY.id)); public static ClientMetricsManager instance() { return INSTANCE; } +// Max cache size (16k active client connections per broker) +private static final int CM_CACHE_MAX_SIZE = 16384; +private final Cache clientInstanceCache; +private final Map subscriptionMap; +private final Time time; + +// The latest subscription version is used to determine if subscription has changed and needs +// to re-evaluate the client instance subscription id as per changed subscriptions. +private final AtomicInteger subscriptionUpdateVersion; + +private ClientMetricsManager() { +this(Time.SYSTEM); +} + +// Visible for testing +ClientMetricsManager(Time time) { +this.subscriptionMap = new ConcurrentHashMap<>(); +this.subscriptionUpdateVersion = new AtomicInteger(0); +this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CM_CACHE_MAX_SIZE)); +this.time = time; +} public void updateSubscription(String subscriptionName, Properties properties) { -// TODO: Implement the update logic to manage subscriptions. +// Validate the subscription properties. +ClientMetricsConfigs.validate(subscriptionName, properties); +// IncrementalAlterConfigs API will send empty configs when all the configs are deleted +// for respective subscription. In that case, we need to remove the subscription from the map. +if (properties.isEmpty()) { +// Remove the subscription from the map if it exists, else ignore the config update. +if (subscriptionMap.containsKey(subscriptionName)) { +log.info("Removing subscription [{}] from the subscription map", subscriptionName); +subscriptionMap.remove(subscriptionName); +this.subscriptionUpdateVersion.incrementAndGet(); +} +return; +} + +updateClientSubscription(subscriptionName, new ClientMetricsConfigs(properties)); +} + +public GetTelemetrySubscriptionsResponse processGetTelemetrySubscriptionRequest( +GetTelemetrySubscriptionsRequest request, int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +long now = time.milliseconds(); +Uuid clientInstanceId = Optional.ofNullable(request.data().clientInstanceId()) +.filter(id -> !id.equals(Uuid.ZERO_UUID)) +.orElse(generateNewClientId()); + +/* + Get the client instance from the cache or create a new one. If subscription has changed + since the last request, then the client instance will be re-evaluated. Validation of the + request will be done after the client instance is created. If client issues another get + telemetry request prior to push interval, then the client should get a throttle error but if + the subscription has changed since the last request then the client should get the updated + subscription immediately. +*/ +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext); + +try { +// Validate the get request parameters for the client instance. +validateGetRequest(request, clientInstance, now); +} catch (ApiException exception) { +return request.getErrorResponse(throttleMs, exception); +} + +clientInstance.lastKnownError(Errors.NONE); +return createGetSubscriptionResponse(clientInstanceId, clientInstance, telemetryMaxBytes, throttleMs); +} + +public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest request, +int telemetryMaxBytes, RequestContext requestContext, int throttleMs) { + +Uuid clientInstanceId = request.data().clientInstanceId(); +if (clientInstanceId == null || Uuid.RESERVED.contains(clientInstanceId)) { +String msg = String.format("Invalid request from the client [%s], invalid client instance id", +clientInstanceId); +return request.getErrorResponse(throttleMs, new InvalidRequestException(msg)); +} + +long now = time.milliseconds(); +ClientMetricsInstance clientInstance = clientInstance(clientInstanceId, requestContext);
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
xvrl commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1397895468 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value, but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg: + * org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be + * created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. + * + * + * + * Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the same way as Frequencies. The only difference is that it is compose
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
xvrl commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1397895468 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value, but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 metrics: a rate and a count. For eg: + * org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it has to be + * created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. + * + * + * + * Frequencies is created with an array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + * + * + * + * Percentiles work the same way as Frequencies. The only difference is that it is compose
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on PR #14699: URL: https://github.com/apache/kafka/pull/14699#issuecomment-1817091571 Thanks @junrao for leaving the comments, I have tried to address them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397875576 ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new ClientMetricsInstance(Uuid.randomUuid(), instanceMetadata, 0, 0, +null, ClientMetricsConfigs.DEFAULT_INTERVAL_MS); +} + +@Test +public void testMaybeUpdateRequestEpochValid() { +// First request should be accepted. + assertTrue(clientInstance.maybeUpdateGetRequestEpoch(System.currentTimeMillis())); Review Comment: As the class/methods I am testing do not have `Time` reference rather just tries to updates the epoch supplied hence I didn't see any value in using MockTime. However I did change the code and tests for ClientMetricsManager where these methods are invoked from, I started using Time in ClientMetricsManager and corresponding MockTime in ClientMetricsManagerTest which eliminated the use of `Thread.sleep`. Please let me know if I am missing anything here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397873419 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: I have moved the `ClientMetricsManager` static instance and removed the code from ControllerServer.scala, now initialization only happens in `BrokerServer.scala`. Please let me know your thought for `ClientMetricsReceiverPlugin`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397871398 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: I have removed the conditional llogic in this class and set `0` as the throttle time in case of error in ClientMetricsManager. Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]
soarez commented on PR #14794: URL: https://github.com/apache/kafka/pull/14794#issuecomment-1817065152 @dajac please take a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix unstable sorting in AssignmentsManagerTest [kafka]
soarez opened a new pull request, #14794: URL: https://github.com/apache/kafka/pull/14794 Building AssignReplicasToDirsRequestData relies on iteration over Map entries, which can result in different sorting order. The order does not matter to the semantics of the request, but it can cause issues with test assertions. This issue was introduced in #14369 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1397839267 ## streams/src/main/java/org/apache/kafka/streams/query/TimestampedRangeQuery.java: ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.streams.query; + + +import org.apache.kafka.common.annotation.InterfaceStability.Evolving; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.ValueAndTimestamp; + + +import java.util.Optional; + +/** + * Interactive query for issuing range queries and scans over Timestamped + * KeyValue stores. Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: I noticed that sometimes we do set `throttleTimeMs` with an error. However, this only happens in KafkaApis with the generic request throttling logic. So, for now, I'd recommend that we get rid of this check here and rely on the caller to set `throttleTimeMs` properly. Specifically, for all `errorResponse(int throttleTimeMs, Errors errors)` and `getErrorResponse(int throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass in 0 for `throttleTimeMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: I noticed that sometimes we do set `throttleTimeMs` with an error. However, this only happens in KafkaApis with the generic request throttling logic. So, for now, I'd recommend that we get rid of this check here and rely on the caller to set `throttleTimeMs` properly. Specifically, for all `errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass in 0 for `throttleTimeMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: As discussed earlier, for `THROTTLING_QUOTA_EXCEEDED`, we shouldn't set `throttleTimeMs` since we don't want the client to mute the channel for all requests. Also, I noticed that sometimes we do set `throttleTimeMs` with an error. However, this only happens in KafkaApis with the generic request throttling logic. So, for now, I'd recommend that we get rid of this check here and rely on the caller to set `throttleTimeMs` properly. For example, for all `errorResponse(int throttleTimeMs, Errors errors) `and `getErrorResponse(int throttleTimeMs, Throwable e)` calls in `ClientMetricsManager`, we should pass in 0 for `throttleTimeMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397795152 ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: Yeah, I re-checked the code today and we shouldn't require any change in ControllerServer.scala as the APIs are not supported on controller and we won't require even KIP-1000 to be supported on controllers. I am making the change to initialize ClientMetricsManager only from KafkaServer.scala What do you think about `ClientMetricsReceiverPlugin` based on the usage in the tagged PR above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1397715022 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -104,20 +103,16 @@ public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); -droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); +droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), +metrics); Review Comment: ditto ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -609,17 +842,18 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records +// push one item to the other window that has a join; +// this should produce the joined record first; +// then non-joined record with a closed window // by the time they were produced before // w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // w2 = { } // --> w1 = { 0:A0 (ts: 0), 1:A1 (ts: 100) } // --> w2 = { 1:a1 (ts: 110) } inputTopic2.pipeInput(1, "a1", 110L); processor.checkAndClearProcessResult( -new KeyValueTimestamp<>(0, "A0+null", 0L), -new KeyValueTimestamp<>(1, "A1+a1", 110L) +new KeyValueTimestamp<>(1, "A1+a1", 110L), +new KeyValueTimestamp<>(0, "A0+null", 0L) Review Comment: This is because previously we look at outer store and then join. But this change make it we join first and then look at outer store. The ts in outer store and other store is hard to reason. If we change the ts of 0 to be 100 and ts of 1 to be 50, the original test would still produce 0 first which has larger ts... So unless we compare the ts of join and outer at the same time when we output, we can guarantee the order of ts when output. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -60,24 +61,21 @@ class KStreamKStreamJoin implements ProcessorSupplier joiner, - final boolean outer, - final Optional outerJoinWindowName, - final TimeTrackerSupplier sharedTimeTrackerSupplier) { +KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, final JoinWindowsInternal windows, +final ValueJoinerWithKey joiner, final boolean outer, +final Optional outerJoinWindowName, final TimeTrackerSupplier sharedTimeTrackerSupplier) { Review Comment: Change this back? I think for Kafka streams, the convention is to align with first param's indentation ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -165,40 +155,52 @@ public void process(final Record record) { // problem: // // Say we have a window size of 5 seconds -// 1. A non-joined record with time T10 is seen in the left-topic (maxLeftStreamTime: 10) -// The record is not processed yet, and is added to the outer-join store -// 2. A non-joined record with time T2 is seen in the right-topic (maxRightStreamTime: 2) -// The record is not processed yet, and is added to the outer-join store -// 3. A joined record with time T11 is seen in the left-topic (maxLeftStreamTime: 11) -// It is time to look at the expired records. T10 and T2 should be emitted, but -// because T2 was late, then it is not fetched by the window store, so it is not processed +// 1. A non-joined record with time T10 is seen in the left-topic +// (maxLeftStreamTime: 10) +// The record is not processed yet, and is added to the outer-join store +// 2. A non-joined record with time T2 is seen in the right-topic +// (maxRightStreamTime: 2) +// The record is not processed yet, and is added to the outer-join store +// 3. A joined record with time T11 is seen in the left-topic +// (maxLeftStreamTime: 11) +// It is time to look at the expired records. T10 and T2 should be
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
junrao commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1397714746 ## clients/src/main/java/org/apache/kafka/common/requests/PushTelemetryRequest.java: ## @@ -60,17 +62,47 @@ public PushTelemetryRequest(PushTelemetryRequestData data, short version) { @Override public PushTelemetryResponse getErrorResponse(int throttleTimeMs, Throwable e) { -PushTelemetryResponseData responseData = new PushTelemetryResponseData() -.setErrorCode(Errors.forException(e).code()) -.setThrottleTimeMs(throttleTimeMs); -return new PushTelemetryResponse(responseData); +return errorResponse(throttleTimeMs, Errors.forException(e)); } @Override public PushTelemetryRequestData data() { return data; } +public PushTelemetryResponse errorResponse(int throttleTimeMs, Errors errors) { +PushTelemetryResponseData responseData = new PushTelemetryResponseData(); +responseData.setErrorCode(errors.code()); +/* + THROTTLING_QUOTA_EXCEEDED is thrown in telemetry APIs when the telemetry request + arrives prior the configured minimum interval between two consecutive telemetry requests. + In this case, the throttleTimeMs should not be included in the response to avoid the + muting of the client channel for all the other requests. +*/ +if (Errors.THROTTLING_QUOTA_EXCEEDED != errors) { Review Comment: We shouldn't set throttleTimeMs even for Errors.THROTTLING_QUOTA_EXCEEDED. throttleTimeMs can only be set when there is no error. ## core/src/main/java/kafka/metrics/ClientMetricsReceiverPlugin.java: ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.requests.PushTelemetryRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.server.telemetry.ClientTelemetryReceiver; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka + * server to export client metrics to the registered receivers. + */ +public class ClientMetricsReceiverPlugin { + +private static final ClientMetricsReceiverPlugin INSTANCE = new ClientMetricsReceiverPlugin(); Review Comment: Hmm, why is ClientMetricsManager needed in the controller? I thought only brokers can receive get/push telemetric requests. ## core/src/test/java/kafka/metrics/ClientMetricsInstanceTest.java: ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.Uuid; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.UnknownHostException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ClientMetricsInstanceTest { + +private Uuid uuid; +private ClientMetricsInstanceMetadata instanceMetadata; +private ClientMetricsInstance clientInstance; + +@BeforeEach +public void setUp() throws UnknownHostException { +uuid = Uuid.randomUuid(); +instanceMetadata = new ClientMetricsInstanceMetadata(uuid, +ClientMetricsTestUtils.requestContext()); +clientInstance = new Client
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
AndrewJSchofield commented on PR #14789: URL: https://github.com/apache/kafka/pull/14789#issuecomment-1816929338 Test failures reviewed and unrelated to the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [LI-HOTFIX] Enable register watch when getting children for federated topic namespaces [kafka]
kehuum closed pull request #14793: [LI-HOTFIX] Enable register watch when getting children for federated topic namespaces URL: https://github.com/apache/kafka/pull/14793 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [LI-HOTFIX] Enable register watch when getting children for federated topic namespaces [kafka]
kehuum opened a new pull request, #14793: URL: https://github.com/apache/kafka/pull/14793 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* This pr sets registerWatch to true when getting children for federated topic namespaces so that the corresponding znode change handler can be registered and triggered properly. LI_DESCRIPTION = To register and trigger znode change listener under /federatedTopics/namespace path, we need to set registerWatch to true when getting its children. Tested with snapshot build in kilo cluster. EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been migrated to xinfra clients and all topic CUDs go through xmd, then we don't need kafka broker to understand both old and new topic acl, it will only need to understand the new acl. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]
gharris1727 commented on PR #14762: URL: https://github.com/apache/kafka/pull/14762#issuecomment-1816905171 > What was the conclusion here? Is triggering wakeup on a separate thread (through WorkerSinkTask::stop) before also eventually closing the consumer on a separate thread (through WorkerSinkTask::cancel) safe? Looks like it probably might be, but I was only able to take a cursory look. I don't think it is. For example, it looks like doCommitSync catches the WakeupException and then makes another call to the consumer, and that call could be ongoing when cancel()/close() is called. I think also that after wakeup() exits, there's no guarantee that the other thread has actually thrown the WakeupException and released the consumer lock. Since the plugin has access to the WorkerSinkTaskContext, it can also perform an infinite loop and catch all of the WakeupExceptions. I think that preventing ConcurrentModificationExceptions completely will require heavy synchronization, which gives more opportunities for the task thread to block the herder thread. If we assume that framework has no infinite loops (which may not be the case given the behavior of doCommitSync), we could make cancellation only happen if the task thread is in plugin code, and the consumer is unlikely to be in use. This could be done with an AtomicReference, which is the same synchronization primitive that the Consumer is using to fire the ConcurrentModificationExceptions. I'll look into it more, I don't think we should merge this change until we have a handle on the CMEs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
rreddy-22 commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1397683148 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
[PR] MINOR: Log the ZK dual-write time [kafka]
mumrah opened a new pull request, #14792: URL: https://github.com/apache/kafka/pull/14792 We already capture the ZK dual write timings as metrics. This patch includes those timings in the existing log messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15365) Broker-side replica management changes
[ https://issues.apache.org/jira/browse/KAFKA-15365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim reassigned KAFKA-15365: - Assignee: Omnia Ibrahim > Broker-side replica management changes > -- > > Key: KAFKA-15365 > URL: https://issues.apache.org/jira/browse/KAFKA-15365 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Omnia Ibrahim >Priority: Major > > On the broker side, process metadata changes to partition directories as the > broker catches up to metadata, as described in KIP-858 under "Replica > management". > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Do not check whether updating tasks exist in the waiting loop [kafka]
cadonna opened a new pull request, #14791: URL: https://github.com/apache/kafka/pull/14791 The state updater waits on a condition variable if no tasks exist that need to be updated. The condition variable is wrapped by a loop to account for spurious wake-ups. The check whether updating tasks exist is done in the condition of the loop. Actually, the state updater thread can change whether updating tasks exists, but since the state updater thread is waiting for the condition variable the check for the existence of updating tasks will always return the same value as long as the state updater thread is waiting. Thus, the check only need to be done once before entering the loop. This PR moves check before the loop making also the usage of mocks more robust since the processing becomes more deterministic. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15363: Broker log directory failure changes [kafka]
OmniaGM opened a new pull request, #14790: URL: https://github.com/apache/kafka/pull/14790 - Extend the ReplicaAlterLogDirsThread to send an AssignReplicasToDirsRequestData RPC before promoting the future replica in JBOD mode with KRAFT. - Extend AssignmentsManager and DirectoryEventHandler to attach a callback to the events so we can track the state. - The state of RPCs is kept in memory in ReplicaAlterLogDirsThread, and it gets updated using the callback that is sent to AssignmentsManager. Once we promote the future replica we clear the state of the RPCs related to this replica in question ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
dajac commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1397287511 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -829,21 +902,50 @@ private CoordinatorResult consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); -final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); -throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - -if (memberEpoch == 0) { -log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +ConsumerGroupMember updatedMember; +boolean staticMemberReplaced = false; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); +if (createIfNotExists) { +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +} +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); Review Comment: I still find this logic quite complex to follow. I wonder if we could be a little more explicit. I think that the complexity comes from `throwIfStaticMemberValidationFails` which hide quite a lot of the logic. I wonder if something as follow would be better. I am not sure... What do you think? ``` ConsumerGroupMember existingMember = group.staticMember(instanceId); if (memberEpoch == 0) { // A new static member joins or the existing static member rejoins. if (existingMember == null) { // New static member. member = group.getOrMaybeCreateMember(memberId, true); updatedMemberBuilder = new ConsumerGroupMember.Builder(member); } else if (existingMember.memberId().equals(memberId) { // Static member rejoins the group with epoch 0. member = existingMember; updatedMemberBuilder = new ConsumerGroupMember.Builder(existingMember); } else { // Static member rejoins with a different instance id so it should replace // the previous instance iff the previous instance has -2. if (existingMember.memberEpoch() != LEAVE_GROUP_STATIC_MEMBER_EPOCH) { // The new member can't join. throw Errors.UNRELEASED_INSTANCE_ID.exception(...); } else { // Replace the current member. staticMemberReplaced = true; member = existingMember; updatedMemberBuilder = new ConsumerGroupMember.Builder(group.getOrMaybeCreateMember(memberId, true)); removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); } } else { // Check member id or throw FENCED_INSTANCE_ID // Check epoch with throwIfMemberEpochIsInvalid } ``` Note that I just wrote this without testing it so the code is likely not 100% correct :). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]
dajac commented on code in PR #14582: URL: https://github.com/apache/kafka/pull/14582#discussion_r1397610751 ## tests/kafkatest/tests/core/transactions_test.py: ## @@ -246,8 +246,9 @@ def setup_topics(self): @matrix(failure_mode=["hard_bounce", "clean_bounce"], bounce_target=["brokers", "clients"], check_order=[True, False], -use_group_metadata=[True, False]) -def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.all): +use_group_metadata=[True, False], +use_new_coordinator=[True, False]) Review Comment: All the kraft tests failed in that one... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
dajac commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1397600750 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15484: General Rack Aware Assignor [kafka]
dajac commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1397596576 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * Balance: Ensure partitions are distributed equally among all members. + *The difference in assignments sizes between any two members + *should not exceed one partition. + * Rack Matching:When feasible, aim to assign partitions to members + *located on the same rack thus avoiding cross-zone traffic. + * Stickiness: Minimize partition movements among members by retaining + *as much of the existing assignment as possible. + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); +/** + * The member metadata obtained from the assignment specification. + */ +private final Map members; + +/** + * The topic and partition metadata describer. + */ +private final SubscribedTopicDescriber subscribedTopicDescriber; + +/** + * The list of all the topic Ids that the consumer group is subscribed to. + */ +private final Set subscriptionIds; + +/** + * Rack information. + */ +private final RackInfo rackInfo; + +/** + * List of subscribed members for each topic. + */ +private final Map> membersPerTopic; + +/** + * The partitions that still need to be assigned. + */ +private final Set unassignedPartitions; + +/** + * All the partitions that have been retained from the existing assignment. + */ +private final Set assignedStickyPartitions; + +/** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ +private final AssignmentManager assignmentManager; + +/** + * List of all the members sorted by their respective assignment sizes. + */ +private final TreeSet sortedMembersByAssignmentSize; + +/** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ +private final Map currentPartitionOwners; + +/** + * Tracks the owner of each partition in the target assignment. + */ +private final Map partitionOwnerInTargetAssignment; + +/** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ +private PartitionMovements partitionMovements; + +/** + * The new assignment that will be returned. + */ +private final Map targetAssignment; + +public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { +this.members = assignmentSpec.members(); +this.subscribedTopicDescriber = subscribedTopicDescriber; +this.subscriptionIds = assignmentSpec.members().values().stream() +.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) +.peek(topicId -> { +int partitionCount = subscribedTopicDescriber.numPartitions(topicId); +if (partitionCount == -1) { +throw new PartitionAssignorException( +"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." +); +} +}) +.collect(Collectors.toSet()); +this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); +
Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]
soarez commented on code in PR #14737: URL: https://github.com/apache/kafka/pull/14737#discussion_r1397572189 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -143,21 +143,30 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w private def getOfflineReplicas(image: MetadataImage, partition: PartitionRegistration, listenerName: ListenerName): util.List[Integer] = { -// TODO: in order to really implement this correctly, we would need JBOD support. -// That would require us to track which replicas were offline on a per-replica basis. -// See KAFKA-13005. val offlineReplicas = new util.ArrayList[Integer](0) for (brokerId <- partition.replicas) { Option(image.cluster().broker(brokerId)) match { case None => offlineReplicas.add(brokerId) -case Some(broker) => if (broker.fenced() || !broker.listeners().containsKey(listenerName.value())) { +case Some(broker) => if (isReplicaOffline(partition, listenerName, broker)) { offlineReplicas.add(brokerId) } } } offlineReplicas } + private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) = { +broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isInOfflineDir(broker, partition) + } + + private def isInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean = { Review Comment: Good point. Missed that. Updated, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15357: Aggregate and propagate assignments [kafka]
dajac commented on PR #14369: URL: https://github.com/apache/kafka/pull/14369#issuecomment-1816721578 I've also seen `Build / JDK 8 and Scala 2.12 / testAssignmentAggregation() – kafka.server.AssignmentsManagerTest` failing consistently in a few builds ([example](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2392/tests)). @soarez Could you please double check this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]
pprovenzano commented on code in PR #14737: URL: https://github.com/apache/kafka/pull/14737#discussion_r1397547750 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -143,21 +143,30 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w private def getOfflineReplicas(image: MetadataImage, partition: PartitionRegistration, listenerName: ListenerName): util.List[Integer] = { -// TODO: in order to really implement this correctly, we would need JBOD support. -// That would require us to track which replicas were offline on a per-replica basis. -// See KAFKA-13005. val offlineReplicas = new util.ArrayList[Integer](0) for (brokerId <- partition.replicas) { Option(image.cluster().broker(brokerId)) match { case None => offlineReplicas.add(brokerId) -case Some(broker) => if (broker.fenced() || !broker.listeners().containsKey(listenerName.value())) { +case Some(broker) => if (isReplicaOffline(partition, listenerName, broker)) { offlineReplicas.add(brokerId) } } } offlineReplicas } + private def isReplicaOffline(partition: PartitionRegistration, listenerName: ListenerName, broker: BrokerRegistration) = { +broker.fenced() || !broker.listeners().containsKey(listenerName.value()) || isInOfflineDir(broker, partition) + } + + private def isInOfflineDir(broker: BrokerRegistration, partition: PartitionRegistration): Boolean = { Review Comment: Do we have to handle UNASSIGNED here? It would return false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
AndrewJSchofield commented on code in PR #14789: URL: https://github.com/apache/kafka/pull/14789#discussion_r1397505461 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() { assertEquals(5, recordsToTest.get(1).offset()); } +/** + * KAFKA-15836: + * Test that max.poll.records is honoured when consuming from multiple topic-partitions and the + * fetched records are not aligned on max.poll.records boundaries. + * + * tp0 has records 1,2,3; tp1 has records 6,7,8 + * max.poll.records is 2 + * + * poll 1 should return 1,2 + * poll 2 should return 3,6 + * poll 3 should return 7,8 + * + * Or similar :) Review Comment: Well, both client and server do have some kind of next-in-line behaviour I think. Not guaranteed, but it does at least attempt to avoid starvation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15802: Validate remote segment state before fetching index [kafka]
mimaison commented on PR #14759: URL: https://github.com/apache/kafka/pull/14759#issuecomment-1816625357 Is the work complete? If so can we close [KAFKA-15802](https://issues.apache.org/jira/browse/KAFKA-15802)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
kirktrue commented on PR #14789: URL: https://github.com/apache/kafka/pull/14789#issuecomment-1816615350 🤦♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
kirktrue commented on code in PR #14789: URL: https://github.com/apache/kafka/pull/14789#discussion_r1397472048 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ## @@ -1124,6 +1126,62 @@ public void testFetchMaxPollRecords() { assertEquals(5, recordsToTest.get(1).offset()); } +/** + * KAFKA-15836: + * Test that max.poll.records is honoured when consuming from multiple topic-partitions and the + * fetched records are not aligned on max.poll.records boundaries. + * + * tp0 has records 1,2,3; tp1 has records 6,7,8 + * max.poll.records is 2 + * + * poll 1 should return 1,2 + * poll 2 should return 3,6 + * poll 3 should return 7,8 + * + * Or similar :) Review Comment: I'm pretty sure that there's nothing in the current code that will attempt to pull _N_ records from _M_ partitions fairly. Is it possible that the number of records produced to topic _T1_ is >= the value of `maxRecords` at any given point in time, records produced to _T2_ will never be consumed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records
[ https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787243#comment-17787243 ] Andrew Schofield commented on KAFKA-15836: -- I would say that it's not applicable to 3.6. From my reading, it looks like a refactoring in 3.7 introduced a regression that was not caught by unit tests. > KafkaConsumer subscribes to multiple topics does not respect max.poll.records > - > > Key: KAFKA-15836 > URL: https://issues.apache.org/jira/browse/KAFKA-15836 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Philip Nee >Assignee: Andrew Schofield >Priority: Blocker > Labels: consumer > Fix For: 3.6.1 > > > We discovered that when KafkaConsumer subscribes to multiple topics with > max.poll.record configured. The max.poll.record is not properly respected > for all poll() invocation. > > I was able to reproduce it with the AK example, here is how I ran my tests: > [https://github.com/apache/kafka/pull/14772] > > 1. start zookeeper and kafka server (or kraft mode should be fine too) > 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 > 3. Polled records > 400 will be printed to stdout > > Here is what the program does: > The produce produces a large number of records to multiple topics. We > configure the consumer using a max.poll.record = 400, and subscribed to > multiple topics. The consumer poll, and the returned records can sometimes > be larger than 400. > > This is an issue in AK 3.6 but 3.5 was fine. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15362: Resolve offline replicas in metadata cache [kafka]
soarez commented on PR #14737: URL: https://github.com/apache/kafka/pull/14737#issuecomment-1816570498 Thanks for the review @cmccabe. I think I have incorporated all your suggestions. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15836: KafkaConsumer subscribes to multiple topics does not respect max.poll.records [kafka]
AndrewJSchofield opened a new pull request, #14789: URL: https://github.com/apache/kafka/pull/14789 When returning fetched records, the code was not properly honouring `max.poll.records`. When it had fetched records for multiple topic-partitions, it was intended to accumulate records up to `max.poll.records`. Actually, whenever it accumulated records from a partition, it was prepared to add `max.poll.records` for that partition regardless of how many records it had already accumulated, rather than reducing the number of records to add based on records already added from other partitions. I added a test case which failed before the code change in the PR and passes afterwards. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
lucasbru commented on code in PR #14710: URL: https://github.com/apache/kafka/pull/14710#discussion_r1397321134 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -180,6 +180,18 @@ public void maybeAutoCommit(final Map offsets autocommit.setInflightCommitStatus(true); } +/** + * The consumer needs to send an auto commit during the shutdown + */ +public List maybeAutoCommitOnClose() { Review Comment: Can this be package-private? Could this return an optional instead of a list? I'd drop the "onClose" because that doesn't seem to matter here. The method does a `maybeAutoCommit`. The fact that it's executed on close belongs to the calling context no? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManager.java: ## @@ -35,7 +35,7 @@ public interface RequestManager { * synchronization protection in this method's implementation. * * - * + *Close( Review Comment: whoops ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -561,23 +583,27 @@ boolean hasUnsentRequests() { OffsetCommitRequestState addOffsetCommitRequest(final Map offsets) { Review Comment: nit: put overloads next to each other in the file (here you put `createOffsetCommitRequest` in between the two overloads for `addOffsetCommitRequest`) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -231,6 +243,17 @@ private void handleCoordinatorDisconnect(Throwable exception, long currentTimeMs } } +@Override +public NetworkClientDelegate.PollResult pollOnClose() { +if (!pendingRequests.hasUnsentRequests() || !coordinatorRequestManager.coordinator().isPresent()) +return EMPTY; + +sendAutoCommit(subscriptions.allConsumed()); Review Comment: We seem to have two code paths here dealing with the auto commit and I haven't quite grasped what is the idea here. For example, we seem to be creating the auto-commit request twice during close: ``` ConsumerNetworkThread.coordinatorOnClose -> ConsumerNetworkThread.closingTasks -> ConsumerNetworkThread.maybeAutoCommitOnClose -> CommitRequestManager.maybeAutoCommitOnClose -> CommitRequestManager.createOffsetCommitRequest ``` As well as: ``` ConsumerNetworkThread.runAtClose -> CommitRequestManager.pollOnClose -> CommitRequestManager.sendAutoCommit -> CommitRequestManager.addOffsetCommitRequest -> CommitRequestManager.creatOffsetCommitRequest ``` Could you add a bit of context to the PR so that it's easier for me to understand what is the thinking behind the shutdown procedure? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -257,10 +255,62 @@ private void closeInternal(final Duration timeout) { void cleanup() { log.trace("Closing the consumer network thread"); Timer timer = time.timer(closeTimeout); +coordinatorOnClose(timer); runAtClose(requestManagers.entries(), networkClientDelegate, timer); closeQuietly(requestManagers, "request managers"); closeQuietly(networkClientDelegate, "network client delegate"); closeQuietly(applicationEventProcessor, "application event processor"); log.debug("Closed the consumer network thread"); } + +void coordinatorOnClose(final Timer timer) { +if (!requestManagers.coordinatorRequestManager.isPresent()) +return; + +connectCoordinator(timer); + +List tasks = closingTasks(); +do { +long currentTimeMs = timer.currentTimeMs(); +connectCoordinator(timer); +networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); +} while (timer.notExpired() && !tasks.stream().allMatch(v -> v.future().isDone())); +} + +private void connectCoordinator(final Timer timer) { +while (!coordinatorReady()) { +findCoordinatorSync(timer); +} +} + +private boolean coordinatorReady() { +CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.get(); +Optional coordinator = coordinatorRequestManager.coordinator(); +return coordinator.isPresent() && !networkClientDelegate.isUnavailable(coordinator.get()); +} + +private void findCoordinatorSync(final Timer timer) { +CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.get(); +long currentTimeMs = timer.currentTimeMs(); +NetworkClientDelegate.PollResult request = coordinatorRequestManager.pollOnClose(); +networkClientDelegate.addAll(request); +CompletableFuture findCoordinatorRe
[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records
[ https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787217#comment-17787217 ] Mickael Maison commented on KAFKA-15836: Awesome, thanks [~schofielaj] > KafkaConsumer subscribes to multiple topics does not respect max.poll.records > - > > Key: KAFKA-15836 > URL: https://issues.apache.org/jira/browse/KAFKA-15836 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Philip Nee >Assignee: Andrew Schofield >Priority: Blocker > Labels: consumer > Fix For: 3.6.1 > > > We discovered that when KafkaConsumer subscribes to multiple topics with > max.poll.record configured. The max.poll.record is not properly respected > for all poll() invocation. > > I was able to reproduce it with the AK example, here is how I ran my tests: > [https://github.com/apache/kafka/pull/14772] > > 1. start zookeeper and kafka server (or kraft mode should be fine too) > 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 > 3. Polled records > 400 will be printed to stdout > > Here is what the program does: > The produce produces a large number of records to multiple topics. We > configure the consumer using a max.poll.record = 400, and subscribed to > multiple topics. The consumer poll, and the returned records can sometimes > be larger than 400. > > This is an issue in AK 3.6 but 3.5 was fine. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions
[ https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-15793. Resolution: Fixed > Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions > --- > > Key: KAFKA-15793 > URL: https://issues.apache.org/jira/browse/KAFKA-15793 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Assignee: David Arthur >Priority: Major > Labels: flaky-test > Fix For: 3.7.0, 3.6.1 > > Attachments: Screenshot 2023-11-06 at 11.30.06.png > > > The tests have been flaky since they were introduced in > [https://github.com/apache/kafka/pull/14545] (see picture attached). > The stack traces for the flakiness can be found at > [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.zk.ZkMigrationIntegrationTest] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15793) Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions
[ https://issues.apache.org/jira/browse/KAFKA-15793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787216#comment-17787216 ] Mickael Maison commented on KAFKA-15793: Since KAFKA-15799 is now done, marking this as resolved too > Flaky test ZkMigrationIntegrationTest.testMigrateTopicDeletions > --- > > Key: KAFKA-15793 > URL: https://issues.apache.org/jira/browse/KAFKA-15793 > Project: Kafka > Issue Type: Test >Affects Versions: 3.7.0, 3.6.1 >Reporter: Divij Vaidya >Assignee: David Arthur >Priority: Major > Labels: flaky-test > Fix For: 3.7.0, 3.6.1 > > Attachments: Screenshot 2023-11-06 at 11.30.06.png > > > The tests have been flaky since they were introduced in > [https://github.com/apache/kafka/pull/14545] (see picture attached). > The stack traces for the flakiness can be found at > [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.zk.ZkMigrationIntegrationTest] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15201: Allow git push to fail gracefully [kafka]
Owen-CH-Leung commented on PR #14645: URL: https://github.com/apache/kafka/pull/14645#issuecomment-1816422275 @divijvaidya shall we merge this PR ? Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15799 Handle full metadata updates on ZK brokers [kafka]
mumrah commented on PR #14719: URL: https://github.com/apache/kafka/pull/14719#issuecomment-1816420476 @mimaison yup, I closed it just now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15038: Add metadatacache into RemoteLogManager, and refactor all relevant codes [kafka]
Owen-CH-Leung commented on PR #14136: URL: https://github.com/apache/kafka/pull/14136#issuecomment-1816419916 @kamalcph Thanks. Just rebased. Btw when I rebased, I notice that there's a failing test `kafka.server.AssignmentsManagerTest.testAssignmentAggregation` at trunk. Maybe we need to fix that also -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15799) ZK brokers incorrectly handle KRaft metadata snapshots
[ https://issues.apache.org/jira/browse/KAFKA-15799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15799: - Fix Version/s: 3.7.0 > ZK brokers incorrectly handle KRaft metadata snapshots > -- > > Key: KAFKA-15799 > URL: https://issues.apache.org/jira/browse/KAFKA-15799 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0, 3.6.1 > > > While working on the fix for KAFKA-15605, I noticed that ZK brokers are > unconditionally merging data from UpdateMetadataRequest with their existing > MetadataCache. This is not the correct behavior when handling a metadata > snapshot from the KRaft controller. > For example, if a topic was deleted in KRaft and not transmitted as part of a > delta update (e.g., during a failover) then the ZK brokers will never remove > the topic from their cache (until they restart and rebuild their cache). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14507) Add ConsumerGroupPrepareAssignment API
[ https://issues.apache.org/jira/browse/KAFKA-14507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787193#comment-17787193 ] Zihao Lin commented on KAFKA-14507: --- Hi team, If this ticket is open I will pick it up and try to add it. > Add ConsumerGroupPrepareAssignment API > -- > > Key: KAFKA-14507 > URL: https://issues.apache.org/jira/browse/KAFKA-14507 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Zihao Lin >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15849) Fix ListGroups API when runtime partition size is zero
[ https://issues.apache.org/jira/browse/KAFKA-15849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15849. - Fix Version/s: 3.7.0 Reviewer: David Jacot Resolution: Fixed > Fix ListGroups API when runtime partition size is zero > -- > > Key: KAFKA-15849 > URL: https://issues.apache.org/jira/browse/KAFKA-15849 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15849: Fix ListGroups API when runtime partition size is zero [kafka]
dajac merged PR #14785: URL: https://github.com/apache/kafka/pull/14785 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1397156239 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) { } } +void wakeup() { Review Comment: I used the same format as on the `Consumer` interface. Same for my recent change to `wokenup`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
cadonna commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1397154783 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable { private final Condition notEmptyCondition; private final IdempotentCloser idempotentCloser = new IdempotentCloser(); +private final AtomicBoolean wakedUp = new AtomicBoolean(false); Review Comment: Oh, boy! How embarrassing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Migrate ProcessorStateManagerTest and StreamThreadTest to Mockito [kafka]
cadonna commented on code in PR #13932: URL: https://github.com/apache/kafka/pull/13932#discussion_r1396975782 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ## @@ -125,11 +122,11 @@ public class ProcessorStateManagerTest { private OffsetCheckpoint checkpoint; private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private StateStore store; Review Comment: My IDE tells me that this field is never used. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ## @@ -361,41 +339,30 @@ public void shouldRecycleStoreAndReregisterChangelog() { assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.getStore(persistentStoreName), equalTo(store)); -reset(context, store); -context.uninitialize(); -expect(store.name()).andStubReturn(persistentStoreName); -replay(context, store); +when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); -verify(context, store); +verify(context, times(2)).uninitialize(); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); } @Test public void shouldClearStoreCache() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); -reset(storeMetadata); -final CachingStore store = EasyMock.createMock(CachingStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); -expect(storeMetadata.store()).andStubReturn(store); -expect(store.name()).andStubReturn(persistentStoreName); -store.clearCache(); Review Comment: I am missing this verification with Mockito and I assume this is the most important one given the name of the test. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -1430,16 +1385,11 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedE @Test public void shouldShutdownTaskManagerOnClose() { -final Consumer consumer = EasyMock.createNiceMock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); -expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata); - expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty()); -EasyMock.replay(consumerGroupMetadata); -final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); -taskManager.shutdown(true); -EasyMock.expectLastCall(); -EasyMock.replay(taskManager, consumer); +when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); +final TaskManager taskManager = mock(TaskManager.class); +doNothing().when(taskManager).shutdown(true); Review Comment: Please change this to verification instead of a stub. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -766,14 +750,10 @@ public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws Interrupt mockTime ); -final Consumer mockConsumer = EasyMock.createNiceMock(Consumer.class); - expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty()); Review Comment: You cannot remove this stub without replacement. In this way a poll on the consumer will return `null`. Some tests are failing because of that. You haven't seen failing them because of the removal of `@Parametrized`. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -2701,32 +2622,20 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { @SuppressWarnings("unchecked") public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath() { final StreamsConfig config = new StreamsConfig(configProps(false)); -final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); +final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); -consumer.subscribe((Collection) anyObject(), anyObject()); -EasyMock.expectLastCall().anyTimes(); -consumer.unsubscribe(); -EasyMock.expectLastCall().anyTimes(); -
Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]
lucasbru commented on code in PR #14779: URL: https://github.com/apache/kafka/pull/14779#discussion_r1397139312 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1102,46 +1207,94 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { subscribeInternal(pattern, Optional.of(listener)); } +/** + * Acquire the light lock and ensure that the consumer hasn't been closed. + * + * @throws IllegalStateException If the consumer has been closed + */ +private void acquireAndEnsureOpen() { +acquire(); +if (this.closed) { +release(); +throw new IllegalStateException("This consumer has already been closed."); +} +} + +/** + * Acquire the light lock protecting this consumer from multithreaded access. Instead of blocking + * when the lock is not available, however, we just throw an exception (since multithreaded usage is not + * supported). + * + * @throws ConcurrentModificationException if another thread already has the lock + */ +private void acquire() { +final Thread thread = Thread.currentThread(); +final long threadId = thread.getId(); +if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) +throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. " + +"currentThread(name: " + thread.getName() + ", id: " + threadId + ")" + +" otherThread(id: " + currentThread.get() + ")" +); +refCount.incrementAndGet(); +} + +/** + * Release the light lock protecting the consumer from multithreaded access. + */ +private void release() { +if (refCount.decrementAndGet() == 0) +currentThread.set(NO_CURRENT_THREAD); +} + private void subscribeInternal(Pattern pattern, Optional listener) { -maybeThrowInvalidGroupIdException(); -if (pattern == null || pattern.toString().isEmpty()) -throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? +acquireAndEnsureOpen(); Review Comment: I took a tiny bit of artistic freedom here to make the lock handling more consistent. I think this is still at the level where we can diverge from the original consumer, but if you feel strongly about it, we can also check for the lock after checking the `groupId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15815) JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled
[ https://issues.apache.org/jira/browse/KAFKA-15815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashwin Pankaj reassigned KAFKA-15815: - Assignee: Ashwin Pankaj > JsonRestServer leaks sockets via HttpURLConnection when keep-alive enabled > -- > > Key: KAFKA-15815 > URL: https://issues.apache.org/jira/browse/KAFKA-15815 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Ashwin Pankaj >Priority: Minor > > By default HttpURLConnection has keep-alive enabled, which allows a single > HttpURLConnection to be left open in order to be re-used for later requests. > This means that despite JsonRestServer calling `close()` on the relevant > InputStream, and calling `disconnect()` on the connection itself, the > HttpURLConnection does not call `close()` on the underlying socket. > This affects the Trogdor AgentTest and CoordinatorTest suites, where most of > the methods make HTTP requests using the JsonRestServer. The effect is that > ~32 sockets are leaked per test run, all remaining in the CLOSE_WAIT state > (half closed) after the test. This is because the JettyServer has correctly > closed the connections, but the HttpURLConnection has not. > There does not appear to be a way to locally override the HttpURLConnection's > behavior in this case, and only disabling keep-alive overall (via the system > property `http.keepAlive=false`) seems to resolve the socket leaks. > To prevent the leaks, we can move JsonRestServer to an alternative HTTP > implementation, perhaps the jetty-client that Connect uses, or disable > keepAlive during tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]
lucasbru commented on code in PR #14779: URL: https://github.com/apache/kafka/pull/14779#discussion_r1397125331 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -112,6 +112,12 @@ public void testInvalidGroupId() { assertThrows(InvalidGroupIdException.class, () -> consumer.committed(new HashSet<>())); } +@Test +public void testFailOnClosedConsumer() { +consumer.close(); +assertThrows(IllegalStateException.class, consumer::assignment); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]
lucasbru commented on code in PR #14779: URL: https://github.com/apache/kafka/pull/14779#discussion_r1397125030 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1068,12 +1168,17 @@ public void onComplete(Map offsets, Exception @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { -backgroundEventProcessor.process(); +acquireAndEnsureOpen(); Review Comment: Yeah, you are right. While double-locking is fine, removed the lock. The public modifier here is just an artifact of interfaces not having package-private members. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15851) broker under replicated due to error java.nio.BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-15851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wangliucheng updated KAFKA-15851: - Attachment: (was: server.log) > broker under replicated due to error java.nio.BufferOverflowException > - > > Key: KAFKA-15851 > URL: https://issues.apache.org/jira/browse/KAFKA-15851 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2 > Environment: Kafka Version: 3.3.2 > Deployment mode: zookeeper >Reporter: wangliucheng >Priority: Major > Attachments: p1.png > > > In my kafka cluster, kafka update 2.0 to 3.3.2 version > {*}first start failed{*}, because the same directory was configured > The error is as follows: > > {code:java} > [2023-11-16 10:04:09,952] ERROR (main kafka.Kafka$ 159) Exiting Kafka due to > fatal exception during startup. > java.lang.IllegalStateException: Duplicate log directories for > skydas_sc_tdevirsec-12 are found in both > /data01/kafka/log/skydas_sc_tdevirsec-12 and > /data07/kafka/log/skydas_sc_tdevirsec-12. It is likely because log directory > failure happened while broker was replacing current replica with future > replica. Recover broker from this failure by manually deleting one of the two > directories for this partition. It is recommended to delete the partition in > the log directory that is known to have failed recently. > at kafka.log.LogManager.loadLog(LogManager.scala:305) > at kafka.log.LogManager.$anonfun$loadLogs$14(LogManager.scala:403) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2023-11-16 10:04:09,953] INFO (kafka-shutdown-hook kafka.server.KafkaServer > 66) [KafkaServer id=1434] shutting down {code} > > > *second,* remove /data07/kafka/log in log.dirs and start kafka also reported > an error : > > {code:java} > [2023-11-16 10:13:10,713] INFO (ReplicaFetcherThread-3-1008 > kafka.log.UnifiedLog 66) [UnifiedLog partition=ty_udp_full-60, > dir=/data04/kafka/log] Rolling new log segment (log_size = > 755780551/1073741824}, offset_index_size = 2621440/2621440, time_index_size = > 1747626/1747626, inactive_time_ms = 2970196/60480). > [2023-11-16 10:13:10,714] ERROR (ReplicaFetcherThread-3-1008 > kafka.server.ReplicaFetcherThread 76) [ReplicaFetcher replicaId=1434, > leaderId=1008, fetcherId=3] Unexpected error occurred while processing data > for partition ty_udp_full-60 at offset 2693467479 > java.nio.BufferOverflowException > at java.nio.Buffer.nextPutIndex(Buffer.java:555) > at java.nio.DirectByteBuffer.putLong(DirectByteBuffer.java:794) > at kafka.log.TimeIndex.$anonfun$maybeAppend$1(TimeIndex.scala:135) > at kafka.log.TimeIndex.maybeAppend(TimeIndex.scala:114) > at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:510) > at kafka.log.LocalLog.$anonfun$roll$9(LocalLog.scala:529) > at kafka.log.LocalLog.$anonfun$roll$9$adapted(LocalLog.scala:529) > at scala.Option.foreach(Option.scala:437) > at kafka.log.LocalLog.$anonfun$roll$2(LocalLog.scala:529) > at kafka.log.LocalLog.roll(LocalLog.scala:786) > at kafka.log.UnifiedLog.roll(UnifiedLog.scala:1537) > at kafka.log.UnifiedLog.maybeRoll(UnifiedLog.scala:1523) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:919) > at kafka.log.UnifiedLog.appendAsFollower(UnifiedLog.scala:778) > at > kafka.cluster.Partition.doAppendRecordsToFollowerOrFutureReplica(Partition.scala:1121) > at > kafka.cluster.Partition.appendRecordsToFollowerOrFutureReplica(Partition.scala:1128) > at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:121) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:336) > at scala.Option.foreach(Option.scala:437) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:325) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:324) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:3
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
lucasbru commented on code in PR #14746: URL: https://github.com/apache/kafka/pull/14746#discussion_r1397061812 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -52,6 +53,8 @@ public class FetchBuffer implements AutoCloseable { private final Condition notEmptyCondition; private final IdempotentCloser idempotentCloser = new IdempotentCloser(); +private final AtomicBoolean wakedUp = new AtomicBoolean(false); Review Comment: nit: wokenUp? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -185,6 +188,16 @@ void awaitNotEmpty(Timer timer) { } } +void wakeup() { Review Comment: nit: `wakeup` vs. `wakeUp` capitalization ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java: ## @@ -166,7 +169,7 @@ void awaitNotEmpty(Timer timer) { try { lock.lock(); -while (isEmpty()) { +while (isEmpty() && !wakedUp.compareAndSet(true, false)) { Review Comment: Nice! I was first convinced that we have an interleaving here that misses the wakeup, but `wakedUp` takes care. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Expose earliest local timestamp via the GetOffsetShell [kafka]
clolov opened a new pull request, #14788: URL: https://github.com/apache/kafka/pull/14788 With the introduction of tiered storage the ListOffsets API can now return the earliest local timestamp. The purpose of this pull request is to enable the Kafka tools to get this information in a similar way as the earliest, latest and maximum timestamps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records
[ https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787128#comment-17787128 ] Andrew Schofield commented on KAFKA-15836: -- Yes, I would say so. I think I'll have a PR on trunk soon. > KafkaConsumer subscribes to multiple topics does not respect max.poll.records > - > > Key: KAFKA-15836 > URL: https://issues.apache.org/jira/browse/KAFKA-15836 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Philip Nee >Assignee: Andrew Schofield >Priority: Blocker > Labels: consumer > Fix For: 3.6.1 > > > We discovered that when KafkaConsumer subscribes to multiple topics with > max.poll.record configured. The max.poll.record is not properly respected > for all poll() invocation. > > I was able to reproduce it with the AK example, here is how I ran my tests: > [https://github.com/apache/kafka/pull/14772] > > 1. start zookeeper and kafka server (or kraft mode should be fine too) > 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 > 3. Polled records > 400 will be printed to stdout > > Here is what the program does: > The produce produces a large number of records to multiple topics. We > configure the consumer using a max.poll.record = 400, and subscribed to > multiple topics. The consumer poll, and the returned records can sometimes > be larger than 400. > > This is an issue in AK 3.6 but 3.5 was fine. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15826: Close consumer when sink task is cancelled [kafka]
yashmayya commented on code in PR #14762: URL: https://github.com/apache/kafka/pull/14762#discussion_r1397042412 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java: ## @@ -163,6 +163,12 @@ public void initialize(TaskConfig taskConfig) { } } +@Override +public void cancel() { +super.cancel(); +Utils.closeQuietly(consumer, "consumer"); Review Comment: ```suggestion Utils.closeQuietly(consumer, "consumer for sink task: " + id); ``` nit ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -767,11 +775,7 @@ public void testResetSinkConnectorOffsetsZombieSinkTasks() throws Exception { connect.kafka().produce(TOPIC, 0, "key", "value"); } -// Configure a sink connector whose sink task blocks in its stop method -Map connectorConfigs = new HashMap<>(); -connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); -connectorConfigs.put(TOPICS_CONFIG, TOPIC); -connectorConfigs.put("block", "Task::stop"); +Map connectorConfigs = baseSinkConnectorConfigs(); Review Comment: Same as above, `baseSinkConnectorConfigs()` can be used directly in the call to `configureConnector` ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java: ## @@ -408,27 +412,30 @@ public void testAlterSinkConnectorOffsetsZombieSinkTasks() throws Exception { connect.kafka().produce(TOPIC, 0, "key", "value"); } -// Configure a sink connector whose sink task blocks in its stop method -Map connectorConfigs = new HashMap<>(); -connectorConfigs.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSinkConnector.class.getName()); -connectorConfigs.put(TOPICS_CONFIG, TOPIC); -connectorConfigs.put("block", "Task::stop"); +Map connectorConfigs = baseSinkConnectorConfigs(); Review Comment: This can be used directly now (i.e. without defining a separate local variable) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records
[ https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield reassigned KAFKA-15836: Assignee: Andrew Schofield (was: Kirk True) > KafkaConsumer subscribes to multiple topics does not respect max.poll.records > - > > Key: KAFKA-15836 > URL: https://issues.apache.org/jira/browse/KAFKA-15836 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Philip Nee >Assignee: Andrew Schofield >Priority: Blocker > Labels: consumer > Fix For: 3.6.1 > > > We discovered that when KafkaConsumer subscribes to multiple topics with > max.poll.record configured. The max.poll.record is not properly respected > for all poll() invocation. > > I was able to reproduce it with the AK example, here is how I ran my tests: > [https://github.com/apache/kafka/pull/14772] > > 1. start zookeeper and kafka server (or kraft mode should be fine too) > 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 > 3. Polled records > 400 will be printed to stdout > > Here is what the program does: > The produce produces a large number of records to multiple topics. We > configure the consumer using a max.poll.record = 400, and subscribed to > multiple topics. The consumer poll, and the returned records can sometimes > be larger than 400. > > This is an issue in AK 3.6 but 3.5 was fine. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint
[ https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash reassigned KAFKA-14877: Assignee: (was: Kamal Chandraprakash) > refactor InMemoryLeaderEpochCheckpoint > -- > > Key: KAFKA-14877 > URL: https://issues.apache.org/jira/browse/KAFKA-14877 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Minor > Fix For: 3.7.0 > > > follow up with this comment: > https://github.com/apache/kafka/pull/13456#discussion_r1154306477 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
clolov commented on PR #14787: URL: https://github.com/apache/kafka/pull/14787#issuecomment-1816112742 I will aim to provide a review today! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
kamalcph commented on code in PR #14787: URL: https://github.com/apache/kafka/pull/14787#discussion_r1397045499 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -625,9 +626,10 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) throws RemoteStorageExcepti // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader // epoch cache then it starts copying the segments from the earliest epoch entry's offset. -copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log)); +copiedOffsetOption = Optional.of(findHighestRemoteOffset(topicIdPartition, log)); logger.info("Found the highest copiedRemoteOffset: {} for partition: {} after becoming leader, " + "leaderEpoch: {}", copiedOffsetOption, topicIdPartition, leaderEpoch); +copiedOffsetOption.ifPresent(epochAndOffset -> log.updateHighestOffsetInRemoteStorage(epochAndOffset.offset())); Review Comment: After broker restart, if there are no more segments to upload, then the `copiedOffset` might be stale. It's good to update the `highestOffsetInRemoteStorage` in the UnifiedLog once we compute it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15241: Compute tiered copied offset by keeping the respective epochs in scope [kafka]
kamalcph opened a new pull request, #14787: URL: https://github.com/apache/kafka/pull/14787 `findHighestRemoteOffset` does not take into account the leader-epoch end offset. This can cause log divergence between the local and remote log segments when there is unclean leader election. To handle it correctly, the logic to find the highest remote offset can be updated to: ``` find-highest-remote-offset = min(end-offset-for-epoch-in-the-checkpoint, highest-remote-offset-for-epoch) ``` Discussion thread: https://github.com/apache/kafka/pull/14004#discussion_r1266864272 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15799 Handle full metadata updates on ZK brokers [kafka]
mimaison commented on PR #14719: URL: https://github.com/apache/kafka/pull/14719#issuecomment-1816083065 Can we now close https://issues.apache.org/jira/browse/KAFKA-15799? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15836) KafkaConsumer subscribes to multiple topics does not respect max.poll.records
[ https://issues.apache.org/jira/browse/KAFKA-15836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787118#comment-17787118 ] Mickael Maison commented on KAFKA-15836: This is marked as a blocker for 3.6.1. We're hoping the make this release in the next couple of weeks. Do you think you can provide a fix by then? > KafkaConsumer subscribes to multiple topics does not respect max.poll.records > - > > Key: KAFKA-15836 > URL: https://issues.apache.org/jira/browse/KAFKA-15836 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: consumer > Fix For: 3.6.1 > > > We discovered that when KafkaConsumer subscribes to multiple topics with > max.poll.record configured. The max.poll.record is not properly respected > for all poll() invocation. > > I was able to reproduce it with the AK example, here is how I ran my tests: > [https://github.com/apache/kafka/pull/14772] > > 1. start zookeeper and kafka server (or kraft mode should be fine too) > 2. Run: examples/bin/java-producer-consumer-demo.sh 1000 > 3. Polled records > 400 will be printed to stdout > > Here is what the program does: > The produce produces a large number of records to multiple topics. We > configure the consumer using a max.poll.record = 400, and subscribed to > multiple topics. The consumer poll, and the returned records can sometimes > be larger than 400. > > This is an issue in AK 3.6 but 3.5 was fine. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15327: ensure the commit manager commit on close [kafka]
philipnee commented on PR #14710: URL: https://github.com/apache/kafka/pull/14710#issuecomment-1816006917 Hi @lucasbru @cadonna - If you get a chance, could you take a look at the PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816006382 > Do you think it is worth adding a ticket about using the BackgroundEventQueue for the fetches? Let me know if you have a concrete idea about how to implement this. Let me think about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15555: Ensure wakeups are handled correctly in poll() [kafka]
cadonna commented on PR #14746: URL: https://github.com/apache/kafka/pull/14746#issuecomment-1816005251 > I wonder if we could just use a BlockingQueue for the fetchBuffer because fetchBuffer.poll(time) blocks until non-empty or timeout. As far as I see from the javadocs of a `BlockingQueue` there is no way to wake up from `poll()` other than interrupting the thread. I think signalling on a condition variable is safer than interrupting a thread since its scope is well-defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15833: Restrict Consumer API to be used from one thread [kafka]
cadonna commented on PR #14779: URL: https://github.com/apache/kafka/pull/14779#issuecomment-1815989094 I agree with @philipnee that testing the locking behavior would be great. However, I also agree with @lucasbru that testing the locking behavior is quite hard. Given the tight deadline to the next release, I propose to create a newbie ticket for testing the blocking behavior and do that in a follow-up PR. Said that, I think some methods might be testable without any additional tooling. For example, `poll()` can be blocked waiting for the fetches in the fetch buffer and then be unblocked once the locking is verified. I do not know if we can do similar for other public methods. In any case, I think we should make a compromise and merge this PR w/o specific tests and do the testing in a follow-up PR (Note: you will not hear/read this sentence often from me). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in ReplicaManagerConcurrencyTest for Scala 2.12 [kafka]
dajac merged PR #14786: URL: https://github.com/apache/kafka/pull/14786 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] HOTFIX: Fix compilation error in ReplicaManagerConcurrencyTest for Scala 2.12 [kafka]
dajac commented on PR #14786: URL: https://github.com/apache/kafka/pull/14786#issuecomment-1815931886 Thanks @lucasbru! The compilation step has succeeded for all build and I have also confirmed that the test passes locally with Scala 2.12 and 2.13. Therefore, I will go ahead and merge it to trunk to unblock all the other PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15174: Ensure CommitAsync propagate the exception to the user [kafka]
lucasbru commented on PR #14680: URL: https://github.com/apache/kafka/pull/14680#issuecomment-1815926638 Scala compilation in 2.12 is broken on master, which is fixed in https://github.com/apache/kafka/pull/14786, otherwise this is looking good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14507) Add ConsumerGroupPrepareAssignment API
[ https://issues.apache.org/jira/browse/KAFKA-14507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zihao Lin reassigned KAFKA-14507: - Assignee: Zihao Lin > Add ConsumerGroupPrepareAssignment API > -- > > Key: KAFKA-14507 > URL: https://issues.apache.org/jira/browse/KAFKA-14507 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Zihao Lin >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)