[jira] [Created] (KAFKA-15893) Bump MetadataVersion for directory assignments
Igor Soarez created KAFKA-15893: --- Summary: Bump MetadataVersion for directory assignments Key: KAFKA-15893 URL: https://issues.apache.org/jira/browse/KAFKA-15893 Project: Kafka Issue Type: Sub-task Reporter: Igor Soarez -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
s7pandey commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1404742800 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Definitely! Do you think I should just add some specific FixedKeyProcessor tests to the MockProcessorContextAPITest file since it is mostly the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add retry for CI [kafka]
ex172000 commented on PR #14828: URL: https://github.com/apache/kafka/pull/14828#issuecomment-1826221078 > Thanks for the PR. Note that The CI configuration already sets retries, we don't need a CI server configuration. Is the CI retry a global setting? It might make more sense if we just retry the failed ones from gradle? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404635960 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
AndrewJSchofield commented on code in PR #14811: URL: https://github.com/apache/kafka/pull/14811#discussion_r1404634302 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3759,6 +3760,21 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } + // Just a placeholder for now. + def handleListClientMetricsResources(request: RequestChannel.Request): CompletableFuture[Unit] = { Review Comment: Just giving the option of asynchronous completion. However, since I don't strictly need it now, I am simplifying it to `Unit`. ## clients/src/main/java/org/apache/kafka/clients/admin/ListClientMetricsResourcesResult.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +import java.util.Collection; + +/** + * The result of the {@link Admin#listClientMetricsResources()} call. + * + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListClientMetricsResourcesResult { +private final KafkaFuture> future; + + ListClientMetricsResourcesResult(KafkaFuture> future) { +this.future = future; +} + +/** + * Returns a future that yields either an exception, or the full set of client metrics + * listings. + * + * In the event of a failure, the future yields nothing but the first exception which + * occurred. + */ +public KafkaFuture> all() { +final KafkaFutureImpl> result = new KafkaFutureImpl<>(); +this.future.whenComplete((listings, throwable) -> { Review Comment: No. Removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
AndrewJSchofield commented on code in PR #14811: URL: https://github.com/apache/kafka/pull/14811#discussion_r1404634074 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -7090,6 +7093,51 @@ private static MemberDescription convertToMemberDescriptions(DescribedGroupMembe assignment); } +@Test +public void testListClientMetricsResources() throws Exception { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +List expected = Arrays.asList( +new ClientMetricsResourceListing("one"), +new ClientMetricsResourceListing("two") +); + +ListClientMetricsResourcesResponseData responseData = +new ListClientMetricsResourcesResponseData().setErrorCode(Errors.NONE.code()); + +responseData.clientMetricsResources() +.add(new ListClientMetricsResourcesResponseData.ClientMetricsResource().setName("one")); +responseData.clientMetricsResources() +.add((new ListClientMetricsResourcesResponseData.ClientMetricsResource()).setName("two")); + +env.kafkaClient().prepareResponse( +request -> request instanceof ListClientMetricsResourcesRequest, +new ListClientMetricsResourcesResponse(responseData)); + +ListClientMetricsResourcesResult result = env.adminClient().listClientMetricsResources(); +assertEquals(new HashSet<>(expected), new HashSet<>(result.all().get())); +} +} + +@Test +public void testListClientMetricsResourcesNotSupported() throws Exception { Review Comment: That appears true to me too, but all of the other tests in that source file are the same pattern. I am going for consistency here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1404613145 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java: ## @@ -170,7 +173,30 @@ public boolean isOpen() { @Override public synchronized byte[] get(final Bytes key) { -return physicalStore.get(prefixKeyFormatter.addPrefix(key)); +return get(key, Optional.empty()); +} + +public synchronized byte[] get(final Bytes key, final Snapshot snapshot) { +return get(key, Optional.of(snapshot)); +} + +private synchronized byte[] get(final Bytes key, final Optional snapshot) { +if (snapshot.isPresent()) { +try (ReadOptions readOptions = new ReadOptions()) { +readOptions.setSnapshot(snapshot.get()); +return physicalStore.get(prefixKeyFormatter.addPrefix(key), readOptions); +} +} else { +return physicalStore.get(prefixKeyFormatter.addPrefix(key)); +} +} + +public Snapshot getSnapshot() { +return physicalStore.db.getSnapshot(); Review Comment: > I think we should push this into `RocksDBStore` class, ie, call `physicalStore.getSnapshot()` here, and extend `RocksDBStore` to track all open snapshots (similar to how we track open iterators and release all open snapshot if the store is closed). Since "Snapshots do not persist across DB restarts ([ref](https://github.com/facebook/rocksdb/wiki/Snapshot))", I think we do not need to track them when closing the store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1404613145 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java: ## @@ -170,7 +173,30 @@ public boolean isOpen() { @Override public synchronized byte[] get(final Bytes key) { -return physicalStore.get(prefixKeyFormatter.addPrefix(key)); +return get(key, Optional.empty()); +} + +public synchronized byte[] get(final Bytes key, final Snapshot snapshot) { +return get(key, Optional.of(snapshot)); +} + +private synchronized byte[] get(final Bytes key, final Optional snapshot) { +if (snapshot.isPresent()) { +try (ReadOptions readOptions = new ReadOptions()) { +readOptions.setSnapshot(snapshot.get()); +return physicalStore.get(prefixKeyFormatter.addPrefix(key), readOptions); +} +} else { +return physicalStore.get(prefixKeyFormatter.addPrefix(key)); +} +} + +public Snapshot getSnapshot() { +return physicalStore.db.getSnapshot(); Review Comment: > I think we should push this into `RocksDBStore` class, ie, call `physicalStore.getSnapshot()` here, and extend `RocksDBStore` to track all open snapshots (similar to how we track open iterators and release all open snapshot if the store is closed). Since "Snapshots do not persist across DB restarts ([ref](https://github.com/facebook/rocksdb/wiki/Snapshot))", I think we do not need track closing them when closing the store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]
aliehsaeedii commented on code in PR #14626: URL: https://github.com/apache/kafka/pull/14626#discussion_r1404610275 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -263,6 +266,78 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { return null; } +public VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final boolean isAscending) { + +Objects.requireNonNull(key, "key cannot be null"); +validateStoreOpen(); + +final List> queryResults = new ArrayList<>(); + +if (toTimestamp < observedStreamTime - historyRetention) { +// history retention exceeded. we still check the latest value store in case the +// latest record version satisfies the timestamp bound, in which case it should +// still be returned (i.e., the latest record version per key never expires). +final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); +if (rawLatestValueAndTimestamp != null) { +final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); +if (recordTimestamp <= toTimestamp) { +// latest value satisfies timestamp bound +queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); +} +} + +// history retention has elapsed and the latest record version (if present) does +// not satisfy the timestamp bound. return null for predictability, even if data +// is still present in segments. +if (queryResults.size() == 0) { +LOG.warn("Returning null for expired get."); +} +return new VersionedRecordIteratorImpl<>(queryResults.listIterator()); +} else { +// take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) +final Snapshot snapshot = latestValueStore.getSnapshot(); +// first check the latest value store +final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key, snapshot); +if (rawLatestValueAndTimestamp != null) { +final long recordTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); +if (recordTimestamp <= toTimestamp) { +queryResults.add(new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), recordTimestamp)); +} +} + +// check segment stores +// consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp} +// but is still valid in query specified time interval. +final List segments = segmentStores.segments(Long.MIN_VALUE, toTimestamp, false); +for (final LogicalKeyValueSegment segment : segments) { +final byte[] rawSegmentValue = segment.get(key, snapshot); Review Comment: > I don't think we can pass the `snapshot` we got from latestValueStore into segmentStore -- it's two independent RocksDBs. Seems like they are the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Query before creating the internal remote log metadata topic [kafka]
apoorvmittal10 commented on code in PR #14755: URL: https://github.com/apache/kafka/pull/14755#discussion_r1404599341 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); } +@Test +public void testInternalTopicExists() { +Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); +ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); +try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { +String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); +Assertions.assertTrue(doesTopicExist); Review Comment: Thanks for fixing this @kamalcph. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
kamalcph commented on code in PR #14649: URL: https://github.com/apache/kafka/pull/14649#discussion_r1404590243 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, leaderEpochCache.foreach(_.clearAndFlush()) producerStateManager.truncateFullyAndStartAt(newOffset) logStartOffset = logStartOffsetOpt.getOrElse(newOffset) +if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, LogStartOffsetIncrementReason.SegmentDeletion) Review Comment: > Adding it to truncateFullyAndStartAt means that anything that calls the function does not have to worry about the updating the local log start offset every time. My preference is to update the local-log-start-offset in `truncateFullyAndStartAt` function as it updates the leader-epoch-checkpoint file, log-start-offset, high-watermark, and log-end-offset. To avoid passing the reason in `maybeIncrementLocalLogStartOffset` method, we can update the `local-log-start-offset` value directly in truncateFullyAndStartAt method since it does the operations by taking the same `lock`. We followed a similar approach in our local branch: https://sourcegraph.com/github.com/satishd/kafka@24d21694f2f0450e8a8e60e85400de3586fc7069/-/blob/core/src/main/scala/kafka/log/Log.scala?L2385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
kamalcph commented on code in PR #14649: URL: https://github.com/apache/kafka/pull/14649#discussion_r1404591766 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, leaderEpochCache.foreach(_.clearAndFlush()) producerStateManager.truncateFullyAndStartAt(newOffset) logStartOffset = logStartOffsetOpt.getOrElse(newOffset) +_localLogStartOffset = newOffset Review Comment: @nikramakrishnan Can we reuse the same approach? ``` if (remoteLogEnabled()) _localLogStartOffset = newOffset ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
kamalcph commented on code in PR #14649: URL: https://github.com/apache/kafka/pull/14649#discussion_r1404590243 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, leaderEpochCache.foreach(_.clearAndFlush()) producerStateManager.truncateFullyAndStartAt(newOffset) logStartOffset = logStartOffsetOpt.getOrElse(newOffset) +if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, LogStartOffsetIncrementReason.SegmentDeletion) Review Comment: > Adding it to truncateFullyAndStartAt means that anything that calls the function does not have to worry about the updating the local log start offset every time. My preference is to update the local-log-start-offset in `truncateFullyAndStartAt` function as it updates the leader-epoch-checkpoint file, log-start-offset, high-watermark, and log-end-offset. To avoid passing the reason in maybeIncrementLocalLogStartOffset` method, we can update the `local-log-start-offset` value directly in truncateFullyAndStartAt method since it takes the same `lock`. We followed a similar approach in our local branch: https://sourcegraph.com/github.com/satishd/kafka@24d21694f2f0450e8a8e60e85400de3586fc7069/-/blob/core/src/main/scala/kafka/log/Log.scala?L2385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
kamalcph commented on code in PR #14649: URL: https://github.com/apache/kafka/pull/14649#discussion_r1404590243 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, leaderEpochCache.foreach(_.clearAndFlush()) producerStateManager.truncateFullyAndStartAt(newOffset) logStartOffset = logStartOffsetOpt.getOrElse(newOffset) +if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, LogStartOffsetIncrementReason.SegmentDeletion) Review Comment: > Adding it to truncateFullyAndStartAt means that anything that calls the function does not have to worry about the updating the local log start offset every time. My preference is to update the local-log-start-offset in `truncateFullyAndStartAt` function as it updates the leader-epoch-checkpoint file, log-start-offset, high-watermark, and log-end-offset. To avoid passing the reason in `maybeIncrementLocalLogStartOffset` method, we can update the `local-log-start-offset` value directly in truncateFullyAndStartAt method since it takes the same `lock`. We followed a similar approach in our local branch: https://sourcegraph.com/github.com/satishd/kafka@24d21694f2f0450e8a8e60e85400de3586fc7069/-/blob/core/src/main/scala/kafka/log/Log.scala?L2385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]
scholzj commented on PR #14756: URL: https://github.com/apache/kafka/pull/14756#issuecomment-1825984878 Thanks a lot for your help with this @mimaison and @viktorsomogyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15464) Allow dynamic reloading of certificates with different DN / SANs
[ https://issues.apache.org/jira/browse/KAFKA-15464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-15464. Fix Version/s: 3.7.0 Resolution: Fixed > Allow dynamic reloading of certificates with different DN / SANs > > > Key: KAFKA-15464 > URL: https://issues.apache.org/jira/browse/KAFKA-15464 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Assignee: Jakub Scholz >Priority: Major > Fix For: 3.7.0 > > > Kafka currently doesn't allow dynamic reloading of keystores when the new key > has a different DN or removes some of the SANs. While it might help to > prevent users from breaking their cluster, in some cases it would be great to > be able to bypass this validation when desired. > More details are in the [KIP-978: Allow dynamic reloading of certificates > with different DN / > SANs|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263429128] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Query before creating the internal remote log metadata topic [kafka]
kamalcph commented on code in PR #14755: URL: https://github.com/apache/kafka/pull/14755#discussion_r1404579784 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); } +@Test +public void testInternalTopicExists() { +Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); +ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); +try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { +String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); +Assertions.assertTrue(doesTopicExist); Review Comment: Opened #14840 to fix this test. Ran the test locally repeatedly (100) to ensure that it does not fail this time: https://github.com/apache/kafka/assets/11411249/3912af3d-269f-4a34-966f-4a9096b49f30;> PTAL ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); } +@Test +public void testInternalTopicExists() { +Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); +ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); +try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { +String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); +Assertions.assertTrue(doesTopicExist); Review Comment: Opened #14840 to fix this test. Ran the test locally repeatedly (100) to ensure that it does not fail this time: https://github.com/apache/kafka/assets/11411249/3912af3d-269f-4a34-966f-4a9096b49f30;> PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]
kamalcph opened a new pull request, #14840: URL: https://github.com/apache/kafka/pull/14840 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404560191 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: I did consider that. I have already endured the annoyance. I'll take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15892) Flaky test: testAlterSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
Apoorv Mittal created KAFKA-15892: - Summary: Flaky test: testAlterSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest Key: KAFKA-15892 URL: https://issues.apache.org/jira/browse/KAFKA-15892 Project: Kafka Issue Type: Bug Reporter: Apoorv Mittal h4. Error org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not alter connector offsets. Error response: \{"error_code":500,"message":"Failed to alter consumer group offsets for connector test-connector either because its tasks haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the Connect cluster may need to be restarted to get rid of the zombie sink tasks."} h4. Stacktrace org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not alter connector offsets. Error response: \{"error_code":500,"message":"Failed to alter consumer group offsets for connector test-connector either because its tasks haven't stopped completely yet or the connector was resumed before the request to alter its offsets could be successfully completed. If the connector is in a stopped state, this operation can be safely retried. If it doesn't eventually succeed, the Connect cluster may need to be restarted to get rid of the zombie sink tasks."} at app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.alterConnectorOffsets(EmbeddedConnect.java:614) at app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.alterConnectorOffsets(EmbeddedConnectCluster.java:48) at app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:363) at app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets(OffsetsApiIntegrationTest.java:287) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) at app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15891) Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
Apoorv Mittal created KAFKA-15891: - Summary: Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest Key: KAFKA-15891 URL: https://issues.apache.org/jira/browse/KAFKA-15891 Project: Kafka Issue Type: Bug Reporter: Apoorv Mittal h4. Error org.opentest4j.AssertionFailedError: Condition not met within timeout 3. Sink connector consumer group offsets should catch up to the topic end offsets ==> expected: but was: h4. Stacktrace org.opentest4j.AssertionFailedError: Condition not met within timeout 3. Sink connector consumer group offsets should catch up to the topic end offsets ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917) at app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Query before creating the internal remote log metadata topic [kafka]
apoorvmittal10 commented on code in PR #14755: URL: https://github.com/apache/kafka/pull/14755#discussion_r1404552551 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -66,6 +69,28 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); } +@Test +public void testInternalTopicExists() { +Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); +ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); +try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { +String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); +Assertions.assertTrue(doesTopicExist); Review Comment: @kamalcph Can we please verify the flakiness of the test introduced in this PR. I can find the test failing intermittently on PR builds, PR build run which reports this test failing: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14632/26/tests/ I have tried to reproduce locally and added `@RepeatedTest(100)` for the test and can reproduce. Can we please either revert this test or fix it. https://github.com/apache/kafka/assets/2861565/105ae973-7b8e-4f40-9e6a-d34320eb9347;> cc: @satishd @junrao - Found this while investigating build test failures for PRs in review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) [kafka]
apoorvmittal10 commented on PR #14632: URL: https://github.com/apache/kafka/pull/14632#issuecomment-1825923669 New failing - 12 Build / JDK 11 and Scala 2.13 / testAlterSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 12s Build / JDK 11 and Scala 2.13 / testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 51s Build / JDK 11 and Scala 2.13 / testResetSinkConnectorOffsetsZombieSinkTasks – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest 58s Build / JDK 21 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest 1m 50s Build / JDK 21 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest 1m 42s Build / JDK 21 and Scala 2.13 / shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() – org.apache.kafka.streams.integration.NamedTopologyIntegrationTest 1m 9s Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest 2m 0s Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest 54s Build / JDK 8 and Scala 2.12 / testAlwaysSendsAccumulatedOfflineDirs() – kafka.server.BrokerLifecycleManagerTest <1s Build / JDK 8 and Scala 2.12 / testInternalTopicExists() – org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest 12s Build / JDK 8 and Scala 2.12 / testListTopicsWithExcludeInternal(String).quorum=zk – org.apache.kafka.tools.TopicCommandIntegrationTest: https://issues.apache.org/jira/browse/KAFKA-15140 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404537279 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -36,14 +36,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft", "kraft+kip848")) - def testSimpleConsumption(quorum: String): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testSimpleConsumption(quorum: String, groupProtocol: String): Unit = { val numRecords = 1 val producer = createProducer() val startingTimestamp = System.currentTimeMillis() sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +this.consumerConfig.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) Review Comment: Adding this change to all the tests is a bit annoying. Have you considered adding this to `IntegrationTestHarness.doSetup` or in `createConsumer`? We could infer it like we did with `isNewGroupCoordinatorEnabled()` in the same class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14412: Add ProcessingThread tag interface [kafka]
nicktelford commented on PR #14839: URL: https://github.com/apache/kafka/pull/14839#issuecomment-1825885812 This is part of KIP-892, and has been broken out into a separate PR to reduce the review burden on the main KIP-892 implementation, since it can be merged independently. Note: KIP-892 requires this, because under `READ_COMMITTED`, `RocksDBStore` needs to know whether to return records from the current transaction buffer (for processing threads), or whether to return only records committed to the underlying RocksDB database (interactive query threads). Previously, this was done with an `instanceof StreamThread` check, but with the experimental `TaskExecutorThread` implementation in `trunk`, we need some way to abstract this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14412: Add ProcessingThread tag interface [kafka]
nicktelford opened a new pull request, #14839: URL: https://github.com/apache/kafka/pull/14839 This interface provides a common supertype for `StreamThread` and `DefaultTaskExecutor.TaskExecutorThread`, which will be used by KIP-892 to differentiate between "processing" threads and interactive query threads. This is needed because `DefaultTaskExecutor.TaskExecutorThread` is `private`, so cannot be seen directly from `RocksDBStore`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1825859013 @cmccabe @pprovenzano @rondagostino PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez commented on PR #14838: URL: https://github.com/apache/kafka/pull/14838#issuecomment-1825858721 This builds on #14820 - KAFKA-15886: Always specify directories for new partition registrations – so this is marked as draft until #14820 is merged. **Reviews**: please focus on the second commit, titled "[KAFKA-15361: Process and persist dir info with broker registration](https://github.com/apache/kafka/pull/14838/commits/aa945193cc705df6e6e904b5ab8a9ecfeca4a38b)" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15361: Process and persist dir info with broker registration [kafka]
soarez opened a new pull request, #14838: URL: https://github.com/apache/kafka/pull/14838 Controllers should process and persist directory information from the broker registration request ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]
viktorsomogyi merged PR #14756: URL: https://github.com/apache/kafka/pull/14756 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-978: Allow dynamic reloading of certificates with different DN / SANs [kafka]
viktorsomogyi commented on PR #14756: URL: https://github.com/apache/kafka/pull/14756#issuecomment-1825833128 There were failing unit tests but rerunning them locally from the PR they all passed, therefore they're likely unrelated flakies. Merging the PR, thanks @scholzj for the contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15801: improve Kafka broker/NetworkClient logging for connectiv… [kafka]
ijuma commented on code in PR #14799: URL: https://github.com/apache/kafka/pull/14799#discussion_r1404471013 ## clients/src/main/java/org/apache/kafka/clients/NetworkClient.java: ## @@ -855,10 +855,11 @@ private void handleTimedOutConnections(List responses, long now) List nodes = connectionStates.nodesWithConnectionSetupTimeout(now); for (String nodeId : nodes) { this.selector.close(nodeId); -log.info( -"Disconnecting from node {} due to socket connection setup timeout. " + +log.warn( Review Comment: Why is this a warning? It is expected to happen in a number of expected situations (like cluster rolls). The additional information seems useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add retry for CI [kafka]
ijuma commented on PR #14828: URL: https://github.com/apache/kafka/pull/14828#issuecomment-1825825013 Thanks for the PR. Note that The CI configuration already sets retries, we don't need a CI server configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Always send cumulative failed dirs in HB request [kafka]
apoorvmittal10 commented on code in PR #14770: URL: https://github.com/apache/kafka/pull/14770#discussion_r1404379306 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest { } @Test - def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = { + def testAlwaysSendsAccumulatedOfflineDirs(): Unit = { val ctx = new RegistrationTestContext(configProperties) val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) -val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData() - .setErrorCode(Errors.NOT_CONTROLLER.code( -val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) -val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( -val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) -offlineDirs.foreach(manager.propagateDirectoryFailure) - -// start the manager late to prevent a race, and force expectations on the first heartbeat manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) - poll(ctx, manager, registration) -val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs() -val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs() -val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs() -assertEquals(offlineDirs, dirs1.asScala.toSet) -assertEquals(offlineDirs, dirs2.asScala.toSet) -assertEquals(Set.empty, dirs3.asScala.toSet) + manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) +poll(ctx, manager, heartbeats(0)).data() +val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() + + manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) +poll(ctx, manager, heartbeats(2)).data() +val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() + + manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) +poll(ctx, manager, heartbeats(4)).data() +val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() Review Comment: Thanks for fixing this @soarez. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15154: Acquire lock when reading checkQuotas [kafka]
mstepan commented on PR #13969: URL: https://github.com/apache/kafka/pull/13969#issuecomment-1825678364 I would suggest using StampedLock instead of just fully synchronized keyword here. Something like https://github.com/apache/kafka/pull/14837/commits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15154) Potential bug: We don't acquire lock when reading checkQuotas
[ https://issues.apache.org/jira/browse/KAFKA-15154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789452#comment-17789452 ] Maksym Stepanenko commented on KAFKA-15154: --- I believe instead of just adding a synchronization block it would be much better to introduce something like a read-write lock (or StanpedLock ). Like this: https://github.com/apache/kafka/pull/14837/commits > Potential bug: We don't acquire lock when reading checkQuotas > - > > Key: KAFKA-15154 > URL: https://issues.apache.org/jira/browse/KAFKA-15154 > Project: Kafka > Issue Type: Bug > Components: metrics >Reporter: Divij Vaidya >Assignee: Lan Ding >Priority: Major > Labels: newbie > > At sensor.java line:254, we call `this.metrics.values()`. metrics is not a > thread safe map and that is why we acquire a lock whenever we want to > add/remove entries from it. For example, see add(), hasMetrics() method. > However, we don't acquire a lock when calling Sensor#checkQuotas(timeMs). > This could lead to a situation where this metrics map may be left in an > inconsistent state (since it is not thread safe for concurrent read/write > access). > The objective of this task is to validate what I said above is correct and if > yes, then fix the situation by enclosing this read in a lock. As a stretch > task, we should consider if we can replace the metrics data structure which > allows concurrent reads but exclusive writes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15154: proper StampedLock synchronization added [kafka]
mstepan opened a new pull request, #14837: URL: https://github.com/apache/kafka/pull/14837 All read and write access to Sensor metrics field should be properly synchronized according to JMM. The best way to separate read and write is to use ReadWriteLock or more advanced version of StampedLock. All tests are GREEN. ### Committer Checklist (excluded from commit message) - [v] Verify design and implementation - [v] Verify test coverage and CI build status - [v] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Always send cumulative failed dirs in HB request [kafka]
soarez commented on code in PR #14770: URL: https://github.com/apache/kafka/pull/14770#discussion_r1404314260 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -207,34 +207,35 @@ class BrokerLifecycleManagerTest { } @Test - def testOfflineDirsSentUntilHeartbeatSuccess(): Unit = { + def testAlwaysSendsAccumulatedOfflineDirs(): Unit = { val ctx = new RegistrationTestContext(configProperties) val manager = new BrokerLifecycleManager(ctx.config, ctx.time, "offline-dirs-sent-in-heartbeat-", isZkBroker = false) val controllerNode = new Node(3000, "localhost", 8021) ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) -val hb1 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData() - .setErrorCode(Errors.NOT_CONTROLLER.code( -val hb2 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) -val hb3 = prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( -val offlineDirs = Set(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA"), Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) -offlineDirs.foreach(manager.propagateDirectoryFailure) - -// start the manager late to prevent a race, and force expectations on the first heartbeat manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) - poll(ctx, manager, registration) -val dirs1 = poll(ctx, manager, hb1).data().offlineLogDirs() -val dirs2 = poll(ctx, manager, hb2).data().offlineLogDirs() -val dirs3 = poll(ctx, manager, hb3).data().offlineLogDirs() -assertEquals(offlineDirs, dirs1.asScala.toSet) -assertEquals(offlineDirs, dirs2.asScala.toSet) -assertEquals(Set.empty, dirs3.asScala.toSet) + manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) +poll(ctx, manager, heartbeats(0)).data() +val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() + + manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) +poll(ctx, manager, heartbeats(2)).data() +val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() + + manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) +poll(ctx, manager, heartbeats(4)).data() +val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() Review Comment: Thanks for reporting this Apoorv. Please see #14836 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on PR #14836: URL: https://github.com/apache/kafka/pull/14836#issuecomment-1825622434 Flakiness detected since #14770 cc @apoorvmittal10 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez opened a new pull request, #14836: URL: https://github.com/apache/kafka/pull/14836 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1404286606 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -386,7 +388,9 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), -UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); +UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), +UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID", UnknownSubscriptionIdException::new), Review Comment: Done, thanks for pointing out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
AndrewJSchofield opened a new pull request, #14835: URL: https://github.com/apache/kafka/pull/14835 In the new consumer, `Consumer.poll(Duration timeout)` blocks for the entire duration. If the consumer is joining a group and has not yet received its assignments, the poll begins before an assignment has yet been received. Because the poll is blocked, it does not notice when partitions are assigned, and it subsequently does not return any records. The old consumer only blocks for the duration of the heartbeat interval and loops for until the poll timeout has passed, and is thus able to check for assignments received. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15778 & KAFKA-15779: Implement metrics manager (KIP-714) [kafka]
AndrewJSchofield commented on code in PR #14699: URL: https://github.com/apache/kafka/pull/14699#discussion_r1404253943 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -386,7 +388,9 @@ public enum Errors { STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new), MISMATCHED_ENDPOINT_TYPE(114, "The request was sent to an endpoint of the wrong type.", MismatchedEndpointTypeException::new), UNSUPPORTED_ENDPOINT_TYPE(115, "This endpoint type is not supported yet.", UnsupportedEndpointTypeException::new), -UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new); +UNKNOWN_CONTROLLER_ID(116, "This controller ID is not known.", UnknownControllerIdException::new), +UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID", UnknownSubscriptionIdException::new), Review Comment: Tiny change request. Please put a full stop at the end of the error strings. `"Client sent a push telemetry request with an invalid or outdated subscription ID."`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404233938 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { } object BaseConsumerTest { + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic"), +Arguments.of("kraft+kip848", "consumer")) + } + + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic")) + } + + // For tests that only work with the generic group protocol, we want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic")) Review Comment: OK. Nice and easy to change now I've refactored it. I'll get on it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
dajac commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404226250 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { } object BaseConsumerTest { + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic"), +Arguments.of("kraft+kip848", "consumer")) + } + + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + def getTestQuorumAndGroupProtocolParametersZkOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic")) + } + + // For tests that only work with the generic group protocol, we want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + def getTestQuorumAndGroupProtocolParametersGenericGroupProtocolOnly() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic")) Review Comment: We likely need it here too. ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -111,6 +115,40 @@ abstract class BaseConsumerTest extends AbstractConsumerTest { } object BaseConsumerTest { + // We want to test the following combinations: + // * ZooKeeper and the generic group protocol + // * KRaft and the generic group protocol + // * KRaft with the new group coordinator enabled and the consumer group protocol + def getTestQuorumAndGroupProtocolParametersAll() : java.util.stream.Stream[Arguments] = { +java.util.stream.Stream.of( +Arguments.of("zk", "generic"), +Arguments.of("kraft", "generic"), +Arguments.of("kraft+kip848", "consumer")) Review Comment: We also need to test the `generic` with `kraft+kip848`. This is what all the tests with `kraft+kip848` prior to your change did. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on PR #14758: URL: https://github.com/apache/kafka/pull/14758#issuecomment-1825499807 This PR now reflects the changes in KAFKA-14781 and also tests the new consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404213789 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1947,4 +2082,215 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer2.close() } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitAsync(groupProtocol: String): Unit = { +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props) +val producer = createProducer() +val numRecords = 1 +val startingTimestamp = System.currentTimeMillis() +val cb = new CountConsumerCommitCallback +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +consumer.assign(List(tp).asJava) +consumer.commitAsync(cb) +TestUtils.pollUntilTrue(consumer, () => cb.successCount >= 1 || cb.lastError.isDefined, + "Failed to observe commit callback before timeout", waitTimeMs = 1) +val committedOffset = consumer.committed(Set(tp).asJava) +assertNotNull(committedOffset) +// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to +// tp. The committed offset should be null. This is intentional. +assertNull(committedOffset.get(tp)) +assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSync(groupProtocol: String): Unit = { +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props) +val producer = createProducer() +val numRecords = 1 +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) +consumer.assign(List(tp).asJava) +consumer.commitSync() +val committedOffset = consumer.committed(Set(tp).asJava) +assertNotNull(committedOffset) +// No valid fetch position due to the absence of consumer.poll; and therefore no offset was committed to +// tp. The committed offset should be null. This is intentional. +assertNull(committedOffset.get(tp)) +assertTrue(consumer.assignment.contains(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndCommitSyncAllConsumed(groupProtocol: String): Unit = { +val numRecords = 1 + +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props) +consumer.assign(List(tp).asJava) +consumer.seek(tp, 0) +consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + +consumer.commitSync() +val committedOffset = consumer.committed(Set(tp).asJava) +assertNotNull(committedOffset) +assertNotNull(committedOffset.get(tp)) +assertEquals(numRecords, committedOffset.get(tp).offset()) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsume(groupProtocol: String): Unit = { +val numRecords = 10 + +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) +consumer.assign(List(tp).asJava) +consumeAndVerifyRecords(consumer = consumer, numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) + +assertEquals(numRecords, consumer.position(tp)) + } + + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) + def testAssignAndConsumeSkippingPosition(groupProtocol: String): Unit = { +val numRecords = 10 + +val producer = createProducer() +val startingTimestamp = System.currentTimeMillis() +sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) + +val props = new Properties() +props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol) +val consumer = createConsumer(configOverrides = props, + configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) +consumer.assign(List(tp).asJava) +val offset = 1 +consumer.seek(tp, offset) +consumeAndVerifyRecords(consumer = consumer, numRecords - offset, startingOffset =
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404208505 ## core/src/test/scala/kafka/utils/TestInfoUtils.scala: ## @@ -39,6 +39,12 @@ object TestInfoUtils { } else { throw new RuntimeException(s"Unknown quorum value") } +} else if (testInfo.getDisplayName().contains("groupProtocol=")) { Review Comment: I've replaced this in the new commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15544: Enable integration tests for new consumer [kafka]
AndrewJSchofield commented on code in PR #14758: URL: https://github.com/apache/kafka/pull/14758#discussion_r1404207856 ## core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala: ## @@ -34,13 +35,15 @@ import scala.collection.Seq */ abstract class BaseConsumerTest extends AbstractConsumerTest { - @Test - def testSimpleConsumption(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("generic", "consumer")) Review Comment: This has been replaced with a `MethodSource` that is capable of returning whatever combination we want. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]
dajac commented on code in PR #14432: URL: https://github.com/apache/kafka/pull/14432#discussion_r1404037483 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -691,6 +708,10 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) { throw new InvalidRequestException("SubscribedTopicNames must be set in first request."); } +} else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) { +throwIfEmptyString(request.memberId(), "MemberId can't be empty."); +throwIfNull(request.instanceId(), "InstanceId can't be null for Static Member. GroupId: " Review Comment: I would rather use `InstanceId can't be null.` here in order to be consistent with the other error messages. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -751,6 +772,53 @@ private void throwIfConsumerGroupIsFull( } } +/** + * Validates and throws an error when the validation fails for static member. + * @param groupId The group id + * @param instanceId The instance id + * @param member The existing static member in the group. + * @param memberEpoch The member epoch with which the static member sends heartbeat. + * @param memberIdThe member id with which the member joins now. + * + * @throws UnknownMemberIdException if member sends heartbeat with a non-zero epoch and no static member exists for + * the instance id. + * @throws org.apache.kafka.common.errors.FencedInstanceIdException If member joins with non-zero epoch but there + * already exists a static member with a different memberId. + * @throws org.apache.kafka.common.errors.UnreleasedInstanceIdException A new member is trying to leave the group + * but the existing static member hasn't requested leaving the group. + + */ +private void throwIfStaticMemberValidationFails( Review Comment: I suppose that we could remove this one now. Could we? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -849,21 +922,53 @@ private CoordinatorResult consumerGr // Get or create the member. if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString(); -final ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists); -throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); - -if (memberEpoch == 0) { -log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +ConsumerGroupMember member; +ConsumerGroupMember.Builder updatedMemberBuilder; +boolean staticMemberReplaced = false; +if (instanceId == null) { +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions); +if (createIfNotExists) { +log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId); +} +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +} else { +member = group.staticMember(instanceId); +if (memberEpoch == 0) { +// A new static member joins or the existing static member rejoins. +if (member == null) { +// New static member. +member = group.getOrMaybeCreateMember(memberId, createIfNotExists); +updatedMemberBuilder = new ConsumerGroupMember.Builder(member); +log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group", groupId, memberId, instanceId); +} else { +// Static member rejoins with a different member id so it should replace +// the previous instance iff the previous member had sent a Leave group. +throwIfInstanceIdIsUnreleased(groupId, memberId, instanceId, member); +// Replace the current member. +staticMemberReplaced = true; +updatedMemberBuilder = new ConsumerGroupMember.Builder(memberId) +.setAssignedPartitions(member.assignedPartitions()); +removeMemberAndCancelTimers(records, group.groupId(), member.memberId()); +} +} else { +throwIfStaticMemberIsUnknown(member, instanceId); +throwIfInstanceIdIsFenced(memberId, instanceId, member); Review Comment: nit: I would put `member` as the first argument to be consistent with the other helpers. ##
Re: [PR] KAFKA-15803: Update leader epoch in commitAsync and committed [kafka]
lucasbru merged PR #14817: URL: https://github.com/apache/kafka/pull/14817 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15803: Update leader epoch in commitAsync and committed [kafka]
lucasbru commented on PR #14817: URL: https://github.com/apache/kafka/pull/14817#issuecomment-1825403240 Test failures unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13531: Disable flaky NamedTopologyIntegrationTest [kafka]
lucasbru commented on PR #14830: URL: https://github.com/apache/kafka/pull/14830#issuecomment-1825399087 Test failures are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13531: Disable flaky NamedTopologyIntegrationTest [kafka]
lucasbru merged PR #14830: URL: https://github.com/apache/kafka/pull/14830 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15817) Avoid reconnecting to the same IP address if multiple addresses are available
[ https://issues.apache.org/jira/browse/KAFKA-15817?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-15817: --- Fix Version/s: (was: 3.6.1) > Avoid reconnecting to the same IP address if multiple addresses are available > - > > Key: KAFKA-15817 > URL: https://issues.apache.org/jira/browse/KAFKA-15817 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1 >Reporter: Bob Barrett >Assignee: Bob Barrett >Priority: Major > Fix For: 3.7.0 > > > In https://issues.apache.org/jira/browse/KAFKA-12193, we changed the DNS > resolution behavior for clients to re-resolve DNS after disconnecting from a > broker, rather than wait until we iterated over all addresses from a given > resolution. This is useful when the IP addresses have changed between the > connection and disconnection. > However, with the behavior change, this does mean that clients could > potentially reconnect immediately to the same IP they just disconnected from, > if the IPs have not changed. In cases where the disconnection happened > because that IP was unhealthy (such as a case where a load balancer has > instances in multiple availability zones and one zone is unhealthy, or a case > where an intermediate component in the network path is going through a > rolling restart), this will delay the client successfully reconnecting. To > address this, clients should remember the IP they just disconnected from and > skip that IP when reconnecting, as long as the address resolved to multiple > addresses. -- This message was sent by Atlassian Jira (v8.20.10#820010)