Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
chia7712 commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1597349032 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -34,7 +34,7 @@ import org.junit.jupiter.api.extension.ExtendWith class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) { Review Comment: this test has redundant `ClusterTestDefaults` as the changed value is equal to default value. ## core/src/test/java/kafka/test/ClusterConfig.java: ## @@ -164,7 +166,7 @@ public Map nameTags() { public static Builder defaultBuilder() { return new Builder() -.setType(Type.ZK) +.setTypes(Collections.singleton(Type.ZK)) Review Comment: We have to align the default value with `ClusterTestDefaults`, right? ## core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala: ## @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith @Timeout(120) @ExtendWith(value = Array(classOf[ClusterTestExtensions])) -@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1) Review Comment: please remove `brokers = 1` ## core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala: ## @@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults() Review Comment: ditto ## tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java: ## @@ -49,7 +48,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults() Review Comment: please remove it ## core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java: ## @@ -35,7 +35,7 @@ @Target({TYPE}) @Retention(RUNTIME) public @interface ClusterTestDefaults { -Type clusterType() default Type.ZK; +Type[] clusterTypes() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT}; Review Comment: Maybe it should be renamed to `types` instead of `clusterTypes`. We do have a `ClusterType` in testing :) ## core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java: ## @@ -44,7 +43,7 @@ @SuppressWarnings("dontUseSystemExit") @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ALL) +@ClusterTestDefaults() Review Comment: This is redundant if you don't define any values ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -92,7 +91,7 @@ public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { } @ClusterTests({ -@ClusterTest(clusterType = Type.ZK) +@ClusterTest(clusterTypes = {Type.ZK}) Review Comment: Could you remove redundant `ClusterTests`? ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -78,8 +78,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) { } @ClusterTests({ -@ClusterTest(clusterType = Type.ZK), -@ClusterTest(clusterType = Type.KRAFT) +@ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT}), Review Comment: ditt ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -78,8 +78,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) { } Review Comment: This test has redundant `ClusterTestDefaults`. Could you please remove it also? ## tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java: ## @@ -73,7 +73,7 @@ public void testDescribeQuorumReplicationSuccessful() throws InterruptedExceptio ); List outputs = stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList()); -if (cluster.config().clusterType() == Type.CO_KRAFT) +if (cluster.config().clusterTypes().contains(Type.CO_KRAFT)) Review Comment: Could you add a new method to expose the `Type`? Otherwise, this check is not accurate since users can set multi-types in `ClusterConfig` ## tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java: ## @@ -86,7 +86,7 @@ public void testDescribeQuorumReplicationSuccessful() throws InterruptedExceptio assertEquals(cluster.config().numControllers() - 1, outputs.stream().filter(o -> followerPattern.matcher(o).find()).count()); Pattern observerPattern = Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Observer\\s*"); -if (cluster.config().clusterType() == Type.CO_KRAFT) +if (cluster.config().clusterTypes().contains(Type.CO_KRAFT)) Review Comment: ditto ##
Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]
chia7712 commented on PR #15917: URL: https://github.com/apache/kafka/pull/15917#issuecomment-2105525456 @kamalcph thanks for all your reviews. I have addressed them in https://github.com/apache/kafka/pull/15917/commits/ff2c163633b341a6db9b1d9d838cec350a41ac15 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]
ableegoldman commented on code in PR #15920: URL: https://github.com/apache/kafka/pull/15920#discussion_r1597343428 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -459,6 +468,38 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr } } +private ApplicationState getApplicationState( Review Comment: nit: we don't use "get" in Streams getter names. I guess this isn't exactly a pure getter, but still. On that note, perhaps a better name would be `buildApplicationState`? 樂 Also: even though it's all internal, I've been on a crusade to get everyone to write javadocs for methods in the StreamsPartitionAssignor with at least a brief explanation of what it does. It's just a super complicated class that does a lot and often mutates things in a way that isn't obvious, so every little bit of documentation helps ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // compute the assignment of tasks to threads within each client and build the final group assignment +getApplicationState( Review Comment: I know we're just throwing away the return value for now but I'd still do this: ```suggestion final ApplicationState applicationState = getApplicationState( ``` Otherwise it kind of seems like this method is supposed to be mutating the input parameters (many of the StreamsPartitionAssignor methods work this way so it's good to distinguish when we're just building something vs operating on the passed in structures) ## streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java: ## @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.processor.assignment; -import org.apache.kafka.common.protocol.types.Field.UUID; +import java.util.UUID; Review Comment: good catch! ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ## @@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // compute the assignment of tasks to threads within each client and build the final group assignment Review Comment: nit: this comment should stay above the `#computeNewAssignment` call ## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; + +public class ApplicationStateImpl implements ApplicationState { + +private final AssignmentConfigs assignmentConfigs; +private final Set statelessTasks; +private final Set statefulTasks; +private final Map kafkaStreamsStates; + +public ApplicationStateImpl( +final AssignmentConfigs assignmentConfigs, +final Map kafkaStreamsStates, +final Set statefulTasks, +final Set statelessTasks +) { Review Comment: KafkaStreams formatting for long signatures is (unfortunately) done like this : ```suggestion public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs, final Map kafkaStreamsStates, final Set statefulTasks, final Set statelessTasks) {
Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]
chia7712 merged PR #15904: URL: https://github.com/apache/kafka/pull/15904 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]
chia7712 commented on PR #15904: URL: https://github.com/apache/kafka/pull/15904#issuecomment-2105442338 |test|jira| |-|-| |testReplicateFromLatest|https://issues.apache.org/jira/browse/KAFKA-16383| |testTaskRequestWithOldStartMsGetsUpdated|https://issues.apache.org/jira/browse/KAFKA-16136| |testIndexFileAlreadyExistOnDiskButNotInCache|https://issues.apache.org/jira/browse/KAFKA-16704| |testMigrateTopicDeletions|https://issues.apache.org/jira/browse/KAFKA-16045| |testReplicateSourceDefault|https://issues.apache.org/jira/browse/KAFKA-15292| |testFenceMultipleBrokers|https://issues.apache.org/jira/browse/KAFKA-16634| |testUnregisterBroker|https://issues.apache.org/jira/browse/KAFKA-13966| |testSyncTopicConfigs|https://issues.apache.org/jira/browse/KAFKA-15945| |testSeparateOffsetsTopic|https://issues.apache.org/jira/browse/KAFKA-14089| -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16704) Fix flaky kafka.log.remote.RemoteIndexCacheTest#testIndexFileAlreadyExistOnDiskButNotInCache
Chia-Ping Tsai created KAFKA-16704: -- Summary: Fix flaky kafka.log.remote.RemoteIndexCacheTest#testIndexFileAlreadyExistOnDiskButNotInCache Key: KAFKA-16704 URL: https://issues.apache.org/jira/browse/KAFKA-16704 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/kafka-RemoteIndexCacheTest3690189103734187552/R68MBnutRfmqJY66XXFoOA:foo-0/remote-log-index-cache/2147584984_Ma8JCqucS7mqKIHfSSDeow.txnindex.deleted at java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87) at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103) at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132) at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at kafka.log.remote.RemoteIndexCacheTest.renameRemoteCacheIndexFileFromDisk$1(RemoteIndexCacheTest.scala:832) at kafka.log.remote.RemoteIndexCacheTest.testIndexFileAlreadyExistOnDiskButNotInCache(RemoteIndexCacheTest.scala:851) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1597305375 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} + +public short featureLevel() { +return featureLevel; +} + +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +// version 1 depends on metadata.version 3.3-IVO +if (featureLevel >= 1 && metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0)) +throw new IllegalArgumentException(FEATURE_NAME + " could not be set to " + featureLevel + +" because it depends on metadata.version=14 (" + MetadataVersion.IBP_3_3_IV0 + ")"); +} + +public static TestFeatureVersion metadataVersionMapping(MetadataVersion metadataVersion) { +if (metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) { Review Comment: This will be simplified when I fix the above. Potentially we can even include the 3.3 check in the FeatureVersion class since it will apply to all features and doesn't change on a per feature basis. Stay tuned for some cleanups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1597304812 ## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.List; + +public enum TestFeatureVersion implements FeatureVersion { + +TEST_0(0), +TEST_1(1), +TEST_2(2); + +private short featureLevel; + +public static final String FEATURE_NAME = "test.feature.version"; +public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1; + +TestFeatureVersion(int featureLevel) { +this.featureLevel = (short) featureLevel; +} Review Comment: I have a plan for this but didn't quite get to it. Maybe we want to do this and maybe we don't. One idea is to have all features have a standard pattern/class for the fields. This would contain the metadata version mapping and a Map of the required other features. Not sure if we will need any more complicated validation logic than that. We could leave the opportunity to have a more complicated validate method if we choose. We can also show an example implementation in the TestFeatureVersion and folks can choose to replicate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]
jolshan commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1597303887 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -258,6 +259,13 @@ public enum MetadataVersion { this.didMetadataChange = didMetadataChange; } +public String featureName() { +return FEATURE_NAME; +} + +public void validateVersion(MetadataVersion metadataVersion, List features) { +} Review Comment: I removed this for now, we can add it back later if we want MetadataVersion to implement our interface. I think there are pros and cons for doing so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16703: Close serverChannel in SocketServer if unable to bind to a port [kafka]
gharris1727 opened a new pull request, #15923: URL: https://github.com/apache/kafka/pull/15923 This was showing up as a leaked socket in KafkaServerTest#testListenerPortAlreadyInUse. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16703) SocketServer leaks ServerSocketChannel when port is already in-use
Greg Harris created KAFKA-16703: --- Summary: SocketServer leaks ServerSocketChannel when port is already in-use Key: KAFKA-16703 URL: https://issues.apache.org/jira/browse/KAFKA-16703 Project: Kafka Issue Type: Test Reporter: Greg Harris Assignee: Greg Harris The SocketServer#openServerSocket method creates a serverSocket, and then attempts to bind it to the selected port. If the port is already in-use, an exception is propagated and the serverSocket is never closed. This causes KafkaServerTest#testListenerPortAlreadyInUse to leak sockets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16702: Fix producer leaks in KafkaLog4jAppenderTest [kafka]
gharris1727 opened a new pull request, #15922: URL: https://github.com/apache/kafka/pull/15922 The tests `testRealProducerConfigWithSyncSendShouldNotThrowException` and `testRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException` create real producer instances, which are leaked when the test exits. Instead, each test should be followed by a cleanup operation where the registered appender is removed and closed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16702) KafkaLog4jAppenderTest leaks producer instances
Greg Harris created KAFKA-16702: --- Summary: KafkaLog4jAppenderTest leaks producer instances Key: KAFKA-16702 URL: https://issues.apache.org/jira/browse/KAFKA-16702 Project: Kafka Issue Type: Test Affects Versions: 3.8.0 Reporter: Greg Harris Assignee: Greg Harris The KafkaLog4jAppenderTest has the method getLog4jConfigWithRealProducer which naturally creates a real producer. This appender is never cleaned up within the test, so the producer is leaked. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Add classic member session timeout to ClassicMemberMetadata [kafka]
dongnuo123 opened a new pull request, #15921: URL: https://github.com/apache/kafka/pull/15921 The heartbeat api to the consumer group with classic protocol members schedules the session timeout. At present, there's no way to get the classic member session timeout in heartbeat to consumer group. This patch stores the session timeout into the ClassicMemberMetadata in ConsumerGroupMemberMetadataValue and update it when it's provided in the join request. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16513) Allow WriteTxnMarkers API with Alter Cluster Permission
[ https://issues.apache.org/jira/browse/KAFKA-16513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan resolved KAFKA-16513. Resolution: Fixed > Allow WriteTxnMarkers API with Alter Cluster Permission > --- > > Key: KAFKA-16513 > URL: https://issues.apache.org/jira/browse/KAFKA-16513 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Nikhil Ramakrishnan >Assignee: Siddharth Yagnik >Priority: Minor > Labels: KIP-1037 > Fix For: 3.8.0 > > > We should allow WriteTxnMarkers API with Alter Cluster Permission because it > can invoked externally by a Kafka AdminClient. Such usage is more aligned > with the Alter permission on the Cluster resource, which includes other > administrative actions invoked from the Kafka AdminClient. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]
jolshan merged PR #15837: URL: https://github.com/apache/kafka/pull/15837 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1597073119 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. -* If the message offset is out of range, throw an OffsetOutOfRangeException +* 1. If the message offset is less than the log-start-offset (or) local-log-start-offset, then it returns the + * message-only metadata. +* 2. If the message offset is beyond the log-end-offset, then it returns the message-only metadata. +* 3. For all other cases, it returns the offset metadata from the log. */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { -checkLogStartOffset(offset) -localLog.convertToOffsetMetadataOrThrow(offset) + private[log] def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = { Review Comment: convertToOffsetMetadata => maybeConvertToOffsetMetadata ? ## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + +val fetchResult = fetchResultOpt.get +assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}") + @ValueSource(ints = Array(1, 2)) + def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = { +val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") +val fetchOffset = 450L +val logStartOffset = 5L +val currentLeaderEpoch = Optional.of[Integer](10) +val replicaId = 1 + +val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) +val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes) + +var fetchResultOpt: Option[FetchPartitionData] = None +def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) +} + +val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback +) + +val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) +// high-watermark is lesser than the log-start-offset +val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0) +when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) +when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) +expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + +val expected = minBytes == 1 +assertEquals(expected, delayedFetch.tryComplete()) +assertEquals(expected, delayedFetch.isCompleted) Review Comment: This exposes an issue in delayedFetch. If HWM is less than fetchOffset, we haven't gained any bytes. So, we shouldn't complete the delayedFetch immediately. ## storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static
[jira] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687 ] Philip Nee deleted comment on KAFKA-16687: was (Author: JIRAUSER283568): [~fortherightous] - Not sure if this is what you are seeing: {code:java} Native Memory Tracking (reserved=3835KB +481KB, committed=3835KB +481KB) 55 (malloc=458KB +234KB #6808 +3631) 56 (tracking overhead=3376KB +247KB) {code} and {code:java} 290 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 291 [0x000147f01034] 292 (malloc=43KB type=Other +43KB #4 +4) 293 294 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 295 [0x00014753d8ac] 296 (malloc=996KB type=Other -1249KB #94 -102) {code} > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15045: (KIP-924): Implement ApplicationState and KafkaStreamsState [kafka]
apourchet opened a new pull request, #15920: URL: https://github.com/apache/kafka/pull/15920 This PR implements read-only container classes for ApplicationState and KafkaStreamsState, and initializes those within StreamsPartitionAssignor#assign. New internal methods were also added to the ClientState to easily pass this data through to the KafkaStreamsState. One test was added to check the lag sorting within the implementation of KafkaStreamsState, which is the counterpart to the test that existed for the ClientState class. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on PR #15909: URL: https://github.com/apache/kafka/pull/15909#issuecomment-2105274302 Thanks all for the helpful feedback! Let's wait for the build and we should be good @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845497#comment-17845497 ] Justine Olshan commented on KAFKA-16699: Yay yay! I'm happy this is getting fixed :) > Have Streams treat InvalidPidMappingException like a ProducerFencedException > > > Key: KAFKA-16699 > URL: https://issues.apache.org/jira/browse/KAFKA-16699 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > > KStreams is able to handle the ProducerFenced (among other errors) cleanly. > It does this by closing the task dirty and triggering a rebalance amongst the > worker threads to rejoin the group. The producer is also recreated. Due to > how streams works (writing to and reading from various topics), the > application is able to figure out the last thing the fenced producer > completed and continue from there. > KStreams EOS V2 also trusts that any open transaction (including those whose > producer is fenced) will be aborted by the server. This is a key factor in > how it is able to operate. In EOS V1, the new InitProducerId fences and > aborts the previous transaction. In either case, we are able to reason about > the last valid state from the fenced producer and how to proceed. > h2. InvalidPidMappingException ≈ ProducerFenced > I argue that InvalidPidMappingException can be handled in the same way. Let > me explain why. > There are two cases we see this error: > # > > {{txnManager.getTransactionState(transactionalId).flatMap { case None => > Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} > # > > {{if (txnMetadata.producerId != producerId) > Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} > h3. Case 1 > We are missing a value in the transactional state map for the transactional > ID. Under normal operations, this is only possible when the transactional ID > expires via the mechanism described above after > {{transactional.id.expiration.ms}} of inactivity. In this case, there is no > state that needs to be reconciled. It is safe to just rebalance and rejoin > the group with a new producer. We probably don’t even need to close the task > dirty, but it doesn’t hurt to do so. > h3. Case 2 > This is a bit more interesting. It says that we have transactional state, but > the producer ID in the request does not match the producer ID associated with > the transactional ID on the broker. How can this happen? > It is possible that a new producer instance B with the same transactional ID > was created after the transactional state expired for instance A. Given there > is no state on the server when B joins, it will get a totally new producer > ID. If the original producer A comes back, it will have state for this > transactional ID but the wrong producer ID. > In this case, the old producer ID is fenced, it’s just the normal epoch-based > fencing logic doesn’t apply. We can treat it the same however. > h2. Summary > As described in the cases above, any time we encounter the InvalidPidMapping > during normal operation, the previous producer was either finished with its > operations or was fenced. Thus, it is safe to close the dirty and rebalance + > rejoin the group just as we do with the ProducerFenced exception. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15804) Broker leaks ServerSocketChannel when exception is thrown from ZkConfigManager during startup
[ https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-15804: --- Fix Version/s: 3.8.0 Assignee: Greg Harris Resolution: Fixed > Broker leaks ServerSocketChannel when exception is thrown from > ZkConfigManager during startup > - > > Key: KAFKA-15804 > URL: https://issues.apache.org/jira/browse/KAFKA-15804 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > Fix For: 3.8.0 > > > This exception is thrown during the > RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic > test in zk mode: > {noformat} > org.apache.kafka.common.config.ConfigException: You have to delete all topics > with the property remote.storage.enable=true before disabling tiered storage > cluster-wide > at > org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566) > at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956) > at > kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73) > at > kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94) > at > kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176) > at > kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175) > at scala.collection.immutable.Map$Map2.foreach(Map.scala:360) > at > kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175) > at > kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166) > at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115) > at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166) > at kafka.server.KafkaServer.startup(KafkaServer.scala:575) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) > at scala.collection.immutable.List.foreach(List.scala:333) > at > kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) > at > kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146) > at > kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319) > at > org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53) > at > org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35) > at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111) > at > kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat} > This leak only occurs for this one test in the RemoteTopicCrudTest; all other > tests including the kraft-mode version do not exhibit a leaked socket. > Here is where the ServerSocket is instantiated: > {noformat} > at > java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113) > at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724) > at kafka.network.Acceptor.(SocketServer.scala:608) > at kafka.network.DataPlaneAcceptor.(SocketServer.scala:454) > at > kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270) > at > kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249) > at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175) > at > kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) > at scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at kafka.network.SocketServer.(SocketServer.scala:175) > at kafka.server.KafkaServer.startup(KafkaServer.scala:344) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356) > at > kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352) > at scala.collection.immutable.List.foreach(List.scala:333) > at > kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352) >
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
gharris1727 merged PR #14729: URL: https://github.com/apache/kafka/pull/14729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1597219039 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -193,11 +193,8 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } pollTimer.update(currentTimeMs); if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { -logger.warn("Consumer poll timeout has expired. This means the time between " + Review Comment: Done, I did like the simplified log but totally agree with your points, both. I've been myself pushing for avoiding changing the existing logs content when possible because I've also heard about customers basing their apps on them. Also agree about the more complete output on the case of not hitting the next poll in a sensible time. So left the log here unchanged (and simplified the other just to not repeat ourselves on the 2 logs). So in the common case that we end up with the 2 log lines, it's just a first one about the situation when it happens, and the 2nd one with the approximate exceeded time when we have the most accurate info. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]
gharris1727 commented on PR #14729: URL: https://github.com/apache/kafka/pull/14729#issuecomment-2105256727 I noticed some instability in th SocketServerTest suite locally, but it doesn't appear to be introduced by this change. It appears on trunk (and 3.7, 3.6, 3.5) and coincides with JDK >= 17. I ran this test suite locally with JDK 11 and got consistent passes, and it passes in CI. The other failures in CI look unrelated, and pass locally. I think i'm comfortable merging this PR at this time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1597219039 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -193,11 +193,8 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } pollTimer.update(currentTimeMs); if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { -logger.warn("Consumer poll timeout has expired. This means the time between " + Review Comment: Done, I did like the simplified log but totally agree with your points, both. I've been myself pushing for avoiding changing the existing logs content when possible because I've also heard about customers basing their apps on them. Also agree about the more complete output on the case of not hitting the next poll. So left the log here unchanged (and simplified the other just to not repeat ourselves on the 2 logs). So in the common case that we end up with the 2 log lines, it's just a first one about the situation when it happens, and the 2nd one with the approximate exceeded time when we have the most accurate info. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally
[ https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16701: Description: These tests are failing for me on a local development environment, but don't appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. I'm using an M1 Mac, so it is possible that either the Mac's linear port allocation, or a native implementation is impacting this. closingChannelSendFailure() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690) at kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434) at kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430) at kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat} closingChannelWithBufferedReceivesFailedSend() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat} closingChannelWithCompleteAndIncompleteBufferedReceives() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511) {noformat} remoteCloseWithBufferedReceives() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat} was: These tests are failing for me on a local development environment, but don't appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. I'm using an M1 Mac, so it is possible that either the Mac's linear port allocation, or a native implementation is impacting this. closingChannelSendFailure() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690) at kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434) at kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430) at kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat} closingChannelWithBufferedReceivesFailedSend() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at
[jira] [Created] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally
Greg Harris created KAFKA-16701: --- Summary: Some SocketServerTest buffered close tests flaky failing locally Key: KAFKA-16701 URL: https://issues.apache.org/jira/browse/KAFKA-16701 Project: Kafka Issue Type: Test Components: core, unit tests Affects Versions: 3.7.0, 3.6.0, 3.5.0 Reporter: Greg Harris These tests are failing for me on a local development environment, but don't appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. I'm using an M1 Mac, so it is possible that either the Mac's linear port allocation, or a native implementation is impacting this. closingChannelSendFailure() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690) at kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434) at kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430) at kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat} closingChannelWithBufferedReceivesFailedSend() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat} closingChannelWithCompleteAndIncompleteBufferedReceives() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511) {noformat} remoteCloseWithBufferedReceives() {noformat} java.lang.AssertionError: receiveRequest timed out at kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) at kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) at kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
ableegoldman commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1597202905 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -193,11 +193,8 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } pollTimer.update(currentTimeMs); if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { -logger.warn("Consumer poll timeout has expired. This means the time between " + Review Comment: Can you actually leave this log untouched? On the one hand I kind of agree with this simplification, and logs are by no means a part of the public contract, but I know for a fact that some people have built observation tools and/or dashboards for things like rebalancing issues by searching for relevant log strings such as this one (I know because I built one myself a long time ago) I don't feel super strongly about this so I won't push back if you'd prefer to clean it up, but imo it doesn't hurt to leave the log here as well Also: in some extreme cases, eg an infinite loop in a user's processing logic, the consumer might never return to call `poll` at all. In less extreme cases, eg some kind of long processing that takes on the order of minutes per record, it might be a very very long time before the consumer gets back to poll and logs the message you added. For the latter case, I think it would be valuable to keep this part about increasing the max.poll.interval or lowering the max.poll.records in the message we log here, when the max poll interval is first missed, so that users know what to do immediately and don't have to wait until they actually get through all 1000 records (or whatever max.poll.records is set to) and finally return to poll to see a hint about which configs to change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join
Karsten Stöckmann created KAFKA-16700: - Summary: Kafka Streams: possible message loss on KTable-KTable FK Left Join Key: KAFKA-16700 URL: https://issues.apache.org/jira/browse/KAFKA-16700 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.7.0 Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka Operators Reporter: Karsten Stöckmann We are experiencing significant, yet intermittent / non-deterministic / unexplainable message loss on a Kafka Streams topology while performing a *KTable-KTable* {*}FK Left Join{*}. Assume the following snippet: {code:java} streamsBuilder .table( folderTopicName, Consumed.with( folderKeySerde, folderSerde)) .leftJoin( agencies, // KTable Folder::agencyIdValue, AggregateFolder::new, TableJoined.as("folder-to-agency"), Materialized .as("folder-to-agency-materialized") .withKeySerde(folderKeySerde) .withValueSerde(aggregateFolderSerde)) .leftJoin( documents, {code} The setup is as follows: A Debezium Connector for PostgreSQL streams database changes into various Kafka topics. A series of Quarkus Kafka Streams applications then performs aggregation operations on those topics to create index documents later to be sent into an OpenSearch system. When firing up the Kafka Streams infrastructure to work on initially populated Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to Kafka), the above shown KTable-KTable FK Left Join seems to produce message loss on the first of a series of FK Left Joins; the right hand {{KTable}} is consumed from an aggregated topic fed from another Kafka Streams topology / application. On a (heavily reduced) test data set of 6828 messages in the {{folderTopicName}} Topic, we observe the following results: * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages * {{{}folder-to-agency-subscription-response{}}}: *3048* messages * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages. Telling from the nature of a (FK) Left Join, I'd expect all messages from the left hand topic should produce an aggregate even if no matching message is found in the right hand topic. Message loss unpredictably varies across tests and seems not to be bound to specific keys or messages. As it seems, this can only be observed when initially firing up the Streams infrastructure to process the message 'backlog' that had been snapshotted by Debezium. A manual snapshot triggered later (i.e. Streams applications already running) seems not to show this behaviour. Additionally, as of yet we observed this kind of message loss only when running multiple replicas of the affected application. When carrying out the tests with only one replica, everything seems to work as expected. We've tried to leverage {{group.initial.rebalance.delay.ms}} in order to rule out possible rebalancing issues, but to no avail. Our Kafka configuration: {code:yaml} offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 default.replication.factor: 3 min.insync.replicas: 2 message.max.bytes: "20971520" {code} Our Kafka Streams application configuration: {code:yaml} kafka-streams.num.stream.threads: 5 kafka-streams.num.standby.replicas: 1 kafka-streams.auto.offset.reset: earliest kafka-streams.cache.max.bytes.buffering: "20971520" kafka-streams.commit.interval.ms: 100 kafka-streams.fetch.max.bytes: "10485760" kafka-streams.max.request.size: "10485760" kafka-streams.max.partition.fetch.bytes: "10485760" kafka-streams.metadata.max.age.ms: 30 kafka-streams.statestore.cache.max.bytes: "20971520" kafka-streams.topology.optimization: all kafka-streams.processing.guarantee: exactly_once_v2 # Kafka Streams Intermediate Topics kafka-streams.topic.compression.type: lz4 kafka-streams.topic.segment.ms: "4320" # 12h kafka-streams.topic.max.compaction.lag.ms: "8640" # 24h kafka-streams.topic.delete.retention.ms: "8640" # 24h kafka-streams.producer.max.request.size: "20971520" # 20MiB kafka-streams.producer.transaction.timeout.ms: 100 # Should match commit.interval.ms, set close to 100ms for exactly_once_v2 kafka-streams.consumer.group.instance.id: ${HOSTNAME} kafka-streams.consumer.heartbeat.interval.ms: 100 kafka-streams.consumer.session.timeout.ms: 45000 {code} All input (and aggregate) topics feature 15 partitions and share this configuration: {code:yaml}
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845464#comment-17845464 ] Philip Nee commented on KAFKA-16687: [~fortherightous] - Not sure if this is what you are seeing: {code:java} Native Memory Tracking (reserved=3835KB +481KB, committed=3835KB +481KB) 55 (malloc=458KB +234KB #6808 +3631) 56 (tracking overhead=3376KB +247KB) {code} and {code:java} 290 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 291 [0x000147f01034] 292 (malloc=43KB type=Other +43KB #4 +4) 293 294 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc 295 [0x00014753d8ac] 296 (malloc=996KB type=Other -1249KB #94 -102) {code} > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]
C0urante merged PR #15906: URL: https://github.com/apache/kafka/pull/15906 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]
C0urante commented on PR #15906: URL: https://github.com/apache/kafka/pull/15906#issuecomment-2105098368 CI has passed on at least one node and the changes are trivial; merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]
jsancio commented on PR #15825: URL: https://github.com/apache/kafka/pull/15825#issuecomment-2105087021 @kamalcph, looks like a bug to me. The predicate should be `if (!hwm.messageOffsetOnly)` or the if/else blocks should be swapped. I suspect that we haven't noticed this bug in the KRaft implementation (`KafkaRaftClient`) because kraft never looks at the segment and byte position for the HWM. If you are going to fix this code, do you mind adding a test for this case? Since `KafkaMetadataLog` calls `UnifiedLog.fetchOffsetSnapshot`, `hwm.messageOffsetOnly` should always be false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
AndrewJSchofield commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1597080823 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) { * member to {@link MemberState#JOINING}, so that it rejoins the group. */ public void resetPollTimer(final long pollMs) { +pollTimer.update(pollMs); if (pollTimer.isExpired()) { Review Comment: Yes, makes sense. When I was reviewing the previous iteration, I found myself looking within the Timer at the internal variables and then trying to figure out whether the derivation being performed was valid. Makes sense to do it within the Timer. Perfectly happy with 2 methods like this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]
kamalcph commented on code in PR #15917: URL: https://github.com/apache/kafka/pull/15917#discussion_r1597048065 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Assertions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; + +class RemoteLogMetadataManagerTestUtils { +private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class); + +static final int METADATA_TOPIC_PARTITIONS_COUNT = 3; +static final short METADATA_TOPIC_REPLICATION_FACTOR = 2; +static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L; + +static Builder builder() { +return new Builder(); +} + +static class Builder { +private String bootstrapServers; +private Set topicIdPartitions = Collections.emptySet(); +private boolean startConsumerThread; +private RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner; +private Map overrideRemoteLogMetadataManagerProps = Collections.emptyMap(); + +private Builder() { +} + +public Builder bootstrapServers(String bootstrapServers) { +this.bootstrapServers = Objects.requireNonNull(bootstrapServers); +return this; +} + +public Builder topicIdPartitions(Set topicIdPartitions) { +this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions); +return this; +} + +public Builder startConsumerThread(boolean startConsumerThread) { +this.startConsumerThread = startConsumerThread; +return this; +} + +public Builder remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner remoteLogMetadataTopicPartitioner) { +this.remoteLogMetadataTopicPartitioner = Objects.requireNonNull(remoteLogMetadataTopicPartitioner); +return this; +} + +public Builder overrideRemoteLogMetadataManagerProps(Map overrideRemoteLogMetadataManagerProps) { +this.overrideRemoteLogMetadataManagerProps = Objects.requireNonNull(overrideRemoteLogMetadataManagerProps); +return this; +} + +public TopicBasedRemoteLogMetadataManager build() { +String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); +TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread); + +// Initialize TopicBasedRemoteLogMetadataManager. +Map configs = new HashMap<>(); +configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +configs.put(BROKER_ID, 0); +configs.put(LOG_DIR, logDir); +configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP,
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
junrao commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2104991266 @clolov @nikramakrishnan : If an RPC is only used by the client, we don't need to bump up the IBP. However, ListOffsetRequest is used by both the client and the broker. If we don't bump the IBP, we can't test the logic for the new ListOffsetRequest on the broker, right? > Otherwise what happens is that the broker treats the version as non-existent while clients don't respect the configuration and still send the new version. Hmm, normally, a client first sends an ApiVersionRequest to the broker to get exposed API versions. The broker decides whether to expose the latest version based on `unstable.api.versions.enable`. If the broker doesn't expose the latest version, the client shouldn't use it. Also, if somehow the client ignores this and indeed sends the new version, it seems that the broker will still take it. Could you explain the problem a bit more? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on PR #15909: URL: https://github.com/apache/kafka/pull/15909#issuecomment-2104982949 Just to clarify what we're getting here, related to @AndrewJSchofield 's very valid point. With this we get the time between internal poll events, which do not translate exactly to calls to consumer.poll depending on the situation. So the log here will be very helpful to tune the config in cases where the delay that led to leaving the group was due to the client app taking too long to process messages after a call to poll. It would be less accurate in cases where the delay is due to the fetch not getting messages for instance, since we internally generate more poll events while at it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-16699: -- Assignee: Walker Carlson > Have Streams treat InvalidPidMappingException like a ProducerFencedException > > > Key: KAFKA-16699 > URL: https://issues.apache.org/jira/browse/KAFKA-16699 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Major > > KStreams is able to handle the ProducerFenced (among other errors) cleanly. > It does this by closing the task dirty and triggering a rebalance amongst the > worker threads to rejoin the group. The producer is also recreated. Due to > how streams works (writing to and reading from various topics), the > application is able to figure out the last thing the fenced producer > completed and continue from there. > KStreams EOS V2 also trusts that any open transaction (including those whose > producer is fenced) will be aborted by the server. This is a key factor in > how it is able to operate. In EOS V1, the new InitProducerId fences and > aborts the previous transaction. In either case, we are able to reason about > the last valid state from the fenced producer and how to proceed. > h2. InvalidPidMappingException ≈ ProducerFenced > I argue that InvalidPidMappingException can be handled in the same way. Let > me explain why. > There are two cases we see this error: > # > > {{txnManager.getTransactionState(transactionalId).flatMap { case None => > Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} > # > > {{if (txnMetadata.producerId != producerId) > Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} > h3. Case 1 > We are missing a value in the transactional state map for the transactional > ID. Under normal operations, this is only possible when the transactional ID > expires via the mechanism described above after > {{transactional.id.expiration.ms}} of inactivity. In this case, there is no > state that needs to be reconciled. It is safe to just rebalance and rejoin > the group with a new producer. We probably don’t even need to close the task > dirty, but it doesn’t hurt to do so. > h3. Case 2 > This is a bit more interesting. It says that we have transactional state, but > the producer ID in the request does not match the producer ID associated with > the transactional ID on the broker. How can this happen? > It is possible that a new producer instance B with the same transactional ID > was created after the transactional state expired for instance A. Given there > is no state on the server when B joins, it will get a totally new producer > ID. If the original producer A comes back, it will have state for this > transactional ID but the wrong producer ID. > In this case, the old producer ID is fenced, it’s just the normal epoch-based > fencing logic doesn’t apply. We can treat it the same however. > h2. Summary > As described in the cases above, any time we encounter the InvalidPidMapping > during normal operation, the previous producer was either finished with its > operations or was fenced. Thus, it is safe to close the dirty and rebalance + > rejoin the group just as we do with the ProducerFenced exception. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1597000322 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) { * member to {@link MemberState#JOINING}, so that it rejoins the group. */ public void resetPollTimer(final long pollMs) { +pollTimer.update(pollMs); if (pollTimer.isExpired()) { Review Comment: agree, makes total sense, so moved the calculation to the timer, with an `isExpiredBy`. Small twist to what I understand you were suggesting, I kept the `isExpired` check, just to avoid having to deal with the logic of deducing if the timer is expired based on the `isExpiredBy` on the HBManager. Seems better to let the timer know the semantics of when it's considered expired (it does consider >= for instance, so just avoiding to bring those semantics into the HBManager). Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1596994382 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) { * member to {@link MemberState#JOINING}, so that it rejoins the group. */ public void resetPollTimer(final long pollMs) { +pollTimer.update(pollMs); if (pollTimer.isExpired()) { -logger.debug("Poll timer has been reset after it had expired"); +logger.warn("Time between subsequent calls to poll() was longer than the configured" + +"max.poll.interval.ms, exceeded by %s ms. This typically implies that the " + Review Comment: yeap, my bad, I had found it too so it's fixed in a commit above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Validate at least one control record [kafka]
jsancio commented on code in PR #15912: URL: https://github.com/apache/kafka/pull/15912#discussion_r1596977565 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -256,7 +256,7 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) { } private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { -// Confirm that it is at most one batch and it is a control record +// Confirm that it is one control batch and it is at least one control record Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104915418 @edoardocomar In general connectors do have to add a configuration like this eventually, because users have different tolerances for errors. Some users want the errors to cause the connector to become FAILED, so that they can see the exception in the REST API and retry it explicitly. Other users want the connector to retry internally infinitely, and not fail for any reason. MM2 has a _lot_ of operations that can fail, and virtually none of them cause the connector to fail. The reason for this is that MM2 has dedicated mode, where there isn't a REST API to surface errors or perform external retries, so external retries are very expensive. It is definitely something that could be fixed eventually with like a "strict mode"? configuration or similar. We've also considered ways to address this from the framework side, with retry policies and automatic restarts, but none of that has been fully designed or implemented yet. I think we should not block this fix on solving that more general problem. If there is a permissions error loading the checkpoints, MM2 should log that, and then degrade gracefully to the current behavior. We can have a KIP that adds "strict mode" make this failure surface, to make this new permission required. In practical terms, without a configuration and with the graceful degradation implementation, we can get this into 3.8. If you're interested in the configuration, that will delay this feature until 4.0. I'm fine with either, but I think the current behavior has caused such considerable friction in the community that we should prefer a 3.8 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException [kafka]
wcarlson5 opened a new pull request, #15919: URL: https://github.com/apache/kafka/pull/15919 KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there. InvalidPidMappingException should be treated the same way. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson updated KAFKA-16699: --- Description: KStreams is able to handle the ProducerFenced (among other errors) cleanly. It does this by closing the task dirty and triggering a rebalance amongst the worker threads to rejoin the group. The producer is also recreated. Due to how streams works (writing to and reading from various topics), the application is able to figure out the last thing the fenced producer completed and continue from there. KStreams EOS V2 also trusts that any open transaction (including those whose producer is fenced) will be aborted by the server. This is a key factor in how it is able to operate. In EOS V1, the new InitProducerId fences and aborts the previous transaction. In either case, we are able to reason about the last valid state from the fenced producer and how to proceed. h2. InvalidPidMappingException ≈ ProducerFenced I argue that InvalidPidMappingException can be handled in the same way. Let me explain why. There are two cases we see this error: # {{txnManager.getTransactionState(transactionalId).flatMap { case None => Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} # {{if (txnMetadata.producerId != producerId) Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} h3. Case 1 We are missing a value in the transactional state map for the transactional ID. Under normal operations, this is only possible when the transactional ID expires via the mechanism described above after {{transactional.id.expiration.ms}} of inactivity. In this case, there is no state that needs to be reconciled. It is safe to just rebalance and rejoin the group with a new producer. We probably don’t even need to close the task dirty, but it doesn’t hurt to do so. h3. Case 2 This is a bit more interesting. It says that we have transactional state, but the producer ID in the request does not match the producer ID associated with the transactional ID on the broker. How can this happen? It is possible that a new producer instance B with the same transactional ID was created after the transactional state expired for instance A. Given there is no state on the server when B joins, it will get a totally new producer ID. If the original producer A comes back, it will have state for this transactional ID but the wrong producer ID. In this case, the old producer ID is fenced, it’s just the normal epoch-based fencing logic doesn’t apply. We can treat it the same however. h2. Summary As described in the cases above, any time we encounter the InvalidPidMapping during normal operation, the previous producer was either finished with its operations or was fenced. Thus, it is safe to close the dirty and rebalance + rejoin the group just as we do with the ProducerFenced exception. > Have Streams treat InvalidPidMappingException like a ProducerFencedException > > > Key: KAFKA-16699 > URL: https://issues.apache.org/jira/browse/KAFKA-16699 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Walker Carlson >Priority: Major > > KStreams is able to handle the ProducerFenced (among other errors) cleanly. > It does this by closing the task dirty and triggering a rebalance amongst the > worker threads to rejoin the group. The producer is also recreated. Due to > how streams works (writing to and reading from various topics), the > application is able to figure out the last thing the fenced producer > completed and continue from there. > KStreams EOS V2 also trusts that any open transaction (including those whose > producer is fenced) will be aborted by the server. This is a key factor in > how it is able to operate. In EOS V1, the new InitProducerId fences and > aborts the previous transaction. In either case, we are able to reason about > the last valid state from the fenced producer and how to proceed. > h2. InvalidPidMappingException ≈ ProducerFenced > I argue that InvalidPidMappingException can be handled in the same way. Let > me explain why. > There are two cases we see this error: > # > > {{txnManager.getTransactionState(transactionalId).flatMap { case None => > Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} > # > > {{if (txnMetadata.producerId != producerId) > Left(Errors.INVALID_PRODUCER_ID_MAPPING)}} > h3. Case 1 > We are missing a value in the transactional state map for the transactional > ID. Under normal operations, this is only possible when the transactional ID > expires via the mechanism described above after > {{transactional.id.expiration.ms}} of inactivity. In this case, there is no > state that needs to be reconciled. It is safe to just rebalance and rejoin > the group with a new producer. We
[jira] [Created] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException
Walker Carlson created KAFKA-16699: -- Summary: Have Streams treat InvalidPidMappingException like a ProducerFencedException Key: KAFKA-16699 URL: https://issues.apache.org/jira/browse/KAFKA-16699 Project: Kafka Issue Type: Improvement Components: streams Reporter: Walker Carlson -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1596969230 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, RequestFuture future) } // visible for testing -synchronized RequestFuture sendHeartbeatRequest() { +public synchronized RequestFuture sendHeartbeatRequest() { Review Comment: This change is no longer valid. I was trying out some things to make the `WorkerCoordinator` test work and this came through because of that. Will remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]
chia7712 commented on PR #15904: URL: https://github.com/apache/kafka/pull/15904#issuecomment-2104903142 > Have the 30 test failures been triaged? |test|jira| | | | |testSyncTopicConfigs|https://issues.apache.org/jira/browse/KAFKA-15945| |testReplicateSourceDefault|https://issues.apache.org/jira/browse/KAFKA-15292| |testProduceConsumeWithWildcardAcls|https://issues.apache.org/jira/browse/KAFKA-16697| |testFenceMultipleBrokers|https://issues.apache.org/jira/browse/KAFKA-16634| |testSeparateOffsetsTopic|https://issues.apache.org/jira/browse/KAFKA-14089| |testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl|https://issues.apache.org/jira/browse/KAFKA-8250| |testDescribeQuorumReplicationSuccessful|https://issues.apache.org/jira/browse/KAFKA-15104| |testDescribeQuorumStatusSuccessful|https://issues.apache.org/jira/browse/KAFKA-16174| |testTaskRequestWithOldStartMsGetsUpdated|https://issues.apache.org/jira/browse/KAFKA-16136| |testConsumptionWithBrokerFailures|https://issues.apache.org/jira/browse/KAFKA-15146| |testCoordinatorFailover|https://issues.apache.org/jira/browse/KAFKA-16024| |testBrokerHeartbeatDuringMigration|https://issues.apache.org/jira/browse/KAFKA-15963| |testAbortTransactionTimeout|https://issues.apache.org/jira/browse/KAFKA-15772| |testMultiConsumerStickyAssignor|https://issues.apache.org/jira/browse/KAFKA-15934| |testDynamicIpConnectionRateQuota|https://issues.apache.org/jira/browse/KAFKA-16698| |testCreateClusterAndPerformReassignment|https://issues.apache.org/jira/browse/KAFKA-15103| |shouldBootstrapTwoBrokersWithFollowerThrottle|https://issues.apache.org/jira/browse/KAFKA-4184| Besides, the thread leaks should be fixed by #15886. Hence, I will rebase code to trigger QA again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16698) Fix flaky kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota
Chia-Ping Tsai created KAFKA-16698: -- Summary: Fix flaky kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota Key: KAFKA-16698 URL: https://issues.apache.org/jira/browse/KAFKA-16698 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai {code:java} org.opentest4j.AssertionFailedError: Timed out waiting for connection rate update to propagate at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138) at app//kafka.network.DynamicConnectionQuotaTest.updateIpConnectionRate(DynamicConnectionQuotaTest.scala:279) at app//kafka.network.DynamicConnectionQuotaTest.testDynamicIpConnectionRateQuota(DynamicConnectionQuotaTest.scala:255) at java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845421#comment-17845421 ] Philip Nee commented on KAFKA-16687: "Looks like it's caused by this" - mind elaborate? > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KafkaDockerWrapper: correct KAFKA_HEAP_OPTS server property [kafka]
omkreddy merged PR #15345: URL: https://github.com/apache/kafka/pull/15345 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
gharris1727 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1596961414 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception { ); } +@Test +public void testPollTimeoutExpiry() throws Exception { +// This is a fabricated test to ensure that a poll timeout expiry happens. The tick thread awaits on +// task#stop method which is blocked. The timeouts have been set accordingly +workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(20))); +workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(40))); +connect = connectBuilder +.numBrokers(1) +.numWorkers(1) +.build(); + +connect.start(); + +connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time"); + +Map connectorWithBlockingTaskStopConfig = new HashMap<>(); +connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName()); +connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1"); + connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, Objects.requireNonNull(TASK_STOP)); + +connect.configureConnector(CONNECTOR_NAME, connectorWithBlockingTaskStopConfig); + +connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( +CONNECTOR_NAME, 1, "connector and tasks did not start in time" +); + +try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { +connect.restartTask(CONNECTOR_NAME, 0); +TestUtils.waitForCondition(() -> logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN")) && Review Comment: Could this assertion be added to an existing BlockingConnectorTest? The blocking plugins are inherently slow to use so we should avoid adding more instances of them. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, RequestFuture future) } // visible for testing -synchronized RequestFuture sendHeartbeatRequest() { +public synchronized RequestFuture sendHeartbeatRequest() { Review Comment: I would respect the "visible for testing" comment above, and leave this package-local. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +268,20 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { +Stage currentStage = listener.onPollTimeoutExpiry(); +log.warn("worker poll timeout has expired. This means the time between subsequent calls to poll() " + +"in DistributedHerder tick() method was longer than the configured rebalance.timeout.ms. " + Review Comment: From a user perspective, the class and method names are irrelevant, and bringing up irrelevant details in diagnostics can be misleading. "The last thing the worker was doing was: {} and may contribute to this timeout" is much more understandable and still gets the point across. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection connectors, Collection connectors, Collection tasks); + + +/** + * Invoked when a worker experiences a poll timeout expiry. Invoking this method allows getting + * the stage which was currently being executed when the poll timeout happened. The default implementation + * returns null + * @return The current stage being executed. Could be null + */ +default Stage onPollTimeoutExpiry() { Review Comment: This is an internal interface, unless this default method actually makes sense on it's own I wouldn't add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Validate at least one control record [kafka]
junrao commented on code in PR #15912: URL: https://github.com/apache/kafka/pull/15912#discussion_r1596962906 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -256,7 +256,7 @@ public void appendControlMessages(MemoryRecordsCreator valueCreator) { } private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { -// Confirm that it is at most one batch and it is a control record +// Confirm that it is one control batch and it is at least one control record Review Comment: validateMemoryRecordAndReturnCount => validateMemoryRecordsAndReturnCount memoryRecord => memoryRecords Also, there is an existing typo creatte on line 268. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16697) Fix flaky kafka.api.SaslGssapiSslEndToEndAuthorizationTest#testProduceConsumeWithWildcardAcls
Chia-Ping Tsai created KAFKA-16697: -- Summary: Fix flaky kafka.api.SaslGssapiSslEndToEndAuthorizationTest#testProduceConsumeWithWildcardAcls Key: KAFKA-16697 URL: https://issues.apache.org/jira/browse/KAFKA-16697 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai {code:java} org.opentest4j.AssertionFailedError: Should have been zero expired connections killed: 1(total=0.0) ==> expected: <0> but was: <1> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:664) at app//kafka.api.EndToEndAuthorizationTest.$anonfun$confirmReauthenticationMetrics$1(EndToEndAuthorizationTest.scala:202) at app//kafka.api.EndToEndAuthorizationTest.$anonfun$confirmReauthenticationMetrics$1$adapted(EndToEndAuthorizationTest.scala:200) at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576) at app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933) at app//kafka.api.EndToEndAuthorizationTest.confirmReauthenticationMetrics(EndToEndAuthorizationTest.scala:200) at app//kafka.api.EndToEndAuthorizationTest.testProduceConsumeWithWildcardAcls(EndToEndAuthorizationTest.scala:236) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChai {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]
johnnychhsu commented on PR #15906: URL: https://github.com/apache/kafka/pull/15906#issuecomment-2104887677 @C0urante thanks for the review! Just updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1596953771 ## raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java: ## @@ -79,20 +81,79 @@ void testRemoveVoter() { ); } +@Test +void testIsVoterWithDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isVoter(ReplicaKey.of(2, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse( +voterSet.isVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testIsVoterWithoutDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), false); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testStandaloneAndIsOnlyVoter() { +Map aVoterMap = voterMap(Arrays.asList(1), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isOnlyVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testNotStandaloneAndIsOnlyVoter() { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]
jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1596953465 ## raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java: ## @@ -73,23 +88,35 @@ private QuorumState buildQuorumState(Set voters) { ); } -@Test -public void shouldRecordVoterQuorumState() { -QuorumState state = buildQuorumState(Utils.mkSet(localId, 1, 2)); +@ParameterizedTest +@ValueSource(shorts = {0, 1}) +public void shouldRecordVoterQuorumState(short kraftVersion) { +boolean withDirectoryId = kraftVersion > 0; +Map voterMap = VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId); +voterMap.put(localId, VoterSetTest.voterNode(ReplicaKey.of(localId, Optional.of(localDirectoryId; Review Comment: Yes. Fixed. Excuse the minor errors. ## raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java: ## @@ -79,20 +81,79 @@ void testRemoveVoter() { ); } +@Test +void testIsVoterWithDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), true); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey())); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertFalse( +voterSet.isVoter(ReplicaKey.of(2, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse( +voterSet.isVoter(ReplicaKey.of(4, aVoterMap.get(1).voterKey().directoryId())) +); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testIsVoterWithoutDirectoryId() { +Map aVoterMap = voterMap(Arrays.asList(1, 2, 3), false); +VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap)); + +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(; +assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.of(Uuid.randomUuid(); +assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(; +} + +@Test +void testStandaloneAndIsOnlyVoter() { Review Comment: Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
gharris1727 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1596952044 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +267,18 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { Review Comment: Yeah the tick thread stage is a best effort and can be misleading. However, if something has blocked long enough to cause a poll timeout, its likely enough to continue for the additional time it requires for the heartbeat thread to notice. I think the ideal use-case I see is that this error pops up in a worker log 3 or more times before an operator has a chance to remediate it, and if the majority of logs blame connector-xyz, the operator can STOP that connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]
johnnychhsu commented on code in PR #15906: URL: https://github.com/apache/kafka/pull/15906#discussion_r1596951379 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -353,6 +353,9 @@ private void awaitTaskConfigurations(MirrorMaker mm, .map(TaskInfo::config) .allMatch(predicate); } catch (Exception ex) { +if (ex instanceof RebalanceNeededException) { +throw ex; +} Review Comment: you are right, let me add the comment, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]
johnnychhsu commented on code in PR #15906: URL: https://github.com/apache/kafka/pull/15906#discussion_r1596951056 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java: ## @@ -260,9 +261,8 @@ public void testMultiNodeCluster() throws Exception { awaitConnectorTasksStart(mirrorMakers.get("node 0"), MirrorHeartbeatConnector.class, sourceAndTarget); // Create one topic per Kafka cluster per MirrorMaker node -final int topicsPerCluster = numNodes; Review Comment: thanks for the review! totally agree, let me move it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah opened a new pull request, #15918: URL: https://github.com/apache/kafka/pull/15918 When becoming the active KRaftMigrationDriver, there is another race condition similar to KAFKA-16171. This time, the race is due to a stale read from ZK. After writing to `/controller` and `/controller_epoch`, it is possible that a read on `/migration` is not linear with the writes that were just made. In other words, we get a stale read on `/migration`. This leads to an inability to sync metadata to ZK due to incorrect zkVersion on the migration Znode. The non-linearizability of reads is in fact documented behavior for ZK, so we need to handle it. To fix the stale read, this patch adds a write to `/migration` after updating `/controller` and `/controller_epoch`. This allows us to learn the correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER state. This patch also adds a check on the current leader epoch when running certain events in KRaftMigrationDriver. Historically, we did not include this check because it is not necessary for correctness. Writes to ZK are gated on the `/controller_epoch` zkVersion, and RPCs sent to brokers are gated on the controller epoch. However, during a time of rapid failover, there is a lot of processing happening on the controller (i.e., full metadata sync to ZK and full UMRs sent to brokers), so it is best to avoid running events we know will fail. There is also a small fix in here to improve the logging of ZK operations. The log message are changed to past tense to reflect the fact that they have already happened by the time the log message is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Validate at least one control record [kafka]
jsancio commented on PR #15912: URL: https://github.com/apache/kafka/pull/15912#issuecomment-2104840600 @junrao @chia7712 thanks for the reviews. PR is ready for another round. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR; Validate at least one control record [kafka]
jsancio commented on code in PR #15912: URL: https://github.com/apache/kafka/pull/15912#discussion_r1596923121 ## raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java: ## @@ -284,6 +284,8 @@ private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { ); } else if (numberOfRecords == null) { throw new IllegalArgumentException("valueCreator didn't create a batch with the count"); +} else if (numberOfRecords < 1) { Review Comment: Added the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104835965 Hi @gharris1727 we're now handling errors in loading the Checkpoints topic. (we still have to add unit tests) Specifically we tested with the not authorized to read case - which the existing KafkaBasedLog was not handling well. At this current stage the task start would fail, which to us seems an improvement as it is detectable and actionable (expecting the change to be noted in the release notes). This looks to us a better behavior than reverting to the old one in case of failure, as maintaining and testing two modes of operation seems too complex. Do you still think we need a KIP - to introduce yet another config to choose between the old behavior (default) and the new one (arguably better in the eyes of this PR authors ...) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
AndrewJSchofield commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1596861973 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) { * member to {@link MemberState#JOINING}, so that it rejoins the group. */ public void resetPollTimer(final long pollMs) { +pollTimer.update(pollMs); if (pollTimer.isExpired()) { -logger.debug("Poll timer has been reset after it had expired"); +logger.warn("Time between subsequent calls to poll() was longer than the configured" + +"max.poll.interval.ms, exceeded by %s ms. This typically implies that the " + Review Comment: `{}`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) { * member to {@link MemberState#JOINING}, so that it rejoins the group. */ public void resetPollTimer(final long pollMs) { +pollTimer.update(pollMs); if (pollTimer.isExpired()) { Review Comment: I would rather have a method added to `Timer` such as `long hasExpiredBy()` so the check for expiration and the calculation of by how much is encapsulated in the timer itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0
[ https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845402#comment-17845402 ] FTR commented on KAFKA-16687: - Looks like it's caused by this. I found some Sensor classes in heap memory which I dumped. Turns good now. I moved the kafkaConsumer instance out from a class which I used it to encapsulate, and then this issue was resolved. No native memory leak now! > Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0 > > > Key: KAFKA-16687 > URL: https://issues.apache.org/jira/browse/KAFKA-16687 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: FTR >Assignee: Philip Nee >Priority: Major > > I am building a Java Project which using Maven dependency Kafka-clients with > 3.7.0 version. > My Java application logic is to use Kafka Consumer to poll Kakfa broker topic > continuously. > I have configured my Java application with JVM options with -Xms8G -Xmx8G > -XX:MaxMetaspaceSize=4G, and then run it. > Also, there are 16G physical memory on my virtual machine. > After my Java application running a long time, I have found that resident > memory of the Java Process was being grown to more than 14G. > In the end, the Java process ate Swap space. > I checked it with jmap -heap pid, and found heap memory usage is Ok. > Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I > found that it's caused by [NMT Internal] memory, which created by > Unsafe_allocatememory xxx. > In my Java application, I don't use any NIO DirectByteBuffer to allocate > memory. > And I check it the Kafka-clients source code, it have codes with use > "sun.misc.unsafe" to allocate memory. And MaxMetaspaceSize not work for it . > > Could you help to check it? How could I to stop this growing native memory to > avoid my System hang? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16688) SystemTimer leaks resources on close
[ https://issues.apache.org/jira/browse/KAFKA-16688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16688. - Resolution: Fixed > SystemTimer leaks resources on close > > > Key: KAFKA-16688 > URL: https://issues.apache.org/jira/browse/KAFKA-16688 > Project: Kafka > Issue Type: Test >Affects Versions: 3.8.0 >Reporter: Gaurav Narula >Assignee: Gaurav Narula >Priority: Major > > We observe some thread leaks with thread name {{executor-client-metrics}}. > This may happen because {{SystemTimer}} doesn't attempt to shutdown its > executor service properly. > Refer: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15885/1/tests > and tests with {{initializationError}} in them for stacktrace -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]
chia7712 commented on code in PR #15917: URL: https://github.com/apache/kafka/pull/15917#discussion_r1596894830 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java: ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; + +class RemoteLogMetadataManagerTestUtils { Review Comment: > Will the TopicBasedRemoteLogMetadataManagerHarness class be removed? Yep, The new test way is only applied to one test class in this PR. I plan to rewrite all tests of storage after this PR gets approved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
chia7712 commented on PR #15911: URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104790646 > Can you actually not remove this because I was starting to use it in the GetOffsetShellToolTest? The problem I faced with using the topic-based RLMM is that it requires the bootstrap-servers to initialise correctly. However, said bootstrap-servers are not present at initialisation time (due to determining the port dynamically) @clolov Could you take a look at #15917 ? I try to use another way to write tests for storage module. `ClusterInstace` get created and port is bound in testing phase. Maybe that can fix problem you described. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]
kamalcph commented on code in PR #15917: URL: https://github.com/apache/kafka/pull/15917#discussion_r1596873472 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java: ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; + +class RemoteLogMetadataManagerTestUtils { Review Comment: Will the `TopicBasedRemoteLogMetadataManagerHarness` class be removed? ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java: ## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; +import static
Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
kamalcph commented on PR #15911: URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104768094 > Can you actually not remove this because I was starting to use it in the GetOffsetShellToolTest? The problem I faced with using the topic-based RLMM is that it requires the bootstrap-servers to initialise correctly. However, said bootstrap-servers are not present at initialisation time (due to determining the port dynamically) We can get the port from the Kafkabroker using the `boundPort` method. If it does not work for you, Could you please open a draft PR? I'll take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
clolov commented on PR #15911: URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104745407 Can you actually not remove this because I was starting to use it in the GetOffsetShellTool? The problem I faced with using the topic-based RLMM is that it requires the bootstrap-servers to start correctly. However, said bootstrap-servers are not present at initialisation time (due to determining the port dynamically) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845380#comment-17845380 ] Johnson Okorie edited comment on KAFKA-16692 at 5/10/24 2:44 PM: - Thanks [~jolshan], looking forward to see your findings! was (Author: JIRAUSER305348): Thanks [~jolshan], looking forward to see you findings! > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance
[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6
[ https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845380#comment-17845380 ] Johnson Okorie commented on KAFKA-16692: Thanks [~jolshan], looking forward to see you findings! > InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled when upgrading from kafka 3.5 to 3.6 > > > Key: KAFKA-16692 > URL: https://issues.apache.org/jira/browse/KAFKA-16692 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.1 >Reporter: Johnson Okorie >Assignee: Justine Olshan >Priority: Major > > We have a kafka cluster running on version 3.5.2 that we are upgrading to > 3.6.1. This cluster has a lot of clients with exactly one semantics enabled > and hence creating transactions. As we replaced brokers with the new > binaries, we observed lots of clients in the cluster experiencing the > following error: > {code:java} > 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, > transactionalId=] Got error produce response with > correlation id 6402937 on topic-partition , retrying > (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The > server disconnected before a response was received.{code} > On inspecting the broker, we saw the following errors on brokers still > running Kafka version 3.5.2: > > {code:java} > message: > Closing socket for because of error > exception_exception_class: > org.apache.kafka.common.errors.InvalidRequestException > exception_exception_message: > Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not > enabled > exception_stacktrace: > org.apache.kafka.common.errors.InvalidRequestException: Received request api > key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled > {code} > On the new brokers running 3.6.1 we saw the following errors: > > {code:java} > [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for > node 1043 with a network exception.{code} > > I can also see this : > {code:java} > [AddPartitionsToTxnManager broker=1055]Cancelled in-flight > ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 > being disconnected (elapsed time since creation: 11ms, elapsed time since > send: 4ms, request timeout: 3ms){code} > We started investigating this issue and digging through the changes in 3.6, > we came across some changes introduced as part of KAFKA-14402 that we thought > might lead to this behaviour. > First we could see that _transaction.partition.verification.enable_ is > enabled by default and enables a new code path that culminates in we sending > version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated > [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269]. > From a > [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] > on the mailing list, [~jolshan] pointed out that this scenario shouldn't be > possible as the following code paths should prevent version 4 > ADD_PARTITIONS_TO_TXN requests being sent to other brokers: > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130] > > [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195] > However, these requests are still sent to other brokers in our environment. > On further inspection of the code, I am wondering if the following code path > could lead to this issue: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500] > In this scenario, we don't have any _NodeApiVersions_ available for the > specified nodeId and potentially skipping the _latestUsableVersion_ check. I > am wondering if it is possible that because _discoverBrokerVersions_ is set > to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it > skips fetching {_}NodeApiVersions{_}? I can see that we create the network > client here: > [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641] > The _NetworkUtils.buildNetworkClient_ method seems to create a network client > that has _discoverBrokerVersions_ set to {_}false{_}. > I was hoping I could get some assistance debugging this issue. Happy to > provide any additional information needed. > > > -- This message was sent by Atlassian Jira
Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]
lianetm commented on code in PR #15897: URL: https://github.com/apache/kafka/pull/15897#discussion_r1596833496 ## core/src/test/java/kafka/test/annotation/ClusterTest.java: ## @@ -33,7 +33,7 @@ @Retention(RUNTIME) @TestTemplate public @interface ClusterTest { -Type clusterType() default Type.DEFAULT; +Type[] clusterTypes() default {}; Review Comment: `ClusterTestDefaults` makes sense to me. Honestly I wasn't thinking of it when suggesting having a default with all the types, simply because I didn't remember we had it. My point was more about having a default with all types, agree with you that the right place for that default should be ClusterTestDefaults -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]
KevinZTW commented on PR #15884: URL: https://github.com/apache/kafka/pull/15884#issuecomment-2104711204 > @KevinZTW Please fix the build error sorry I just fixed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16264: Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration [kafka]
jeqo commented on PR #15888: URL: https://github.com/apache/kafka/pull/15888#issuecomment-2104695522 Thanks, @jolshan ! I missed that one, and helped me to find a couple of missing steps. PTAL and let's confirm if this requires a small KIP to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on PR #15909: URL: https://github.com/apache/kafka/pull/15909#issuecomment-2104687487 Done, so I simplified what we log when the background thread realizes time's up and leaves the group to rejoin eventually (that's all the relevant info at that point). I then moved the log that details the expired max.poll.interval to the place where we can give a more accurate exceeded time, which is on the next app poll event that the background handles. Also updated the test to make sure it checks not only how the exceed time is calculated, but also **where** it is calculated. Makes sense? More accurate now indeed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16445) PATCH method for connector configuration
[ https://issues.apache.org/jira/browse/KAFKA-16445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-16445: -- Fix Version/s: 3.8.0 > PATCH method for connector configuration > > > Key: KAFKA-16445 > URL: https://issues.apache.org/jira/browse/KAFKA-16445 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Ivan Yurchenko >Assignee: Ivan Yurchenko >Priority: Minor > Fix For: 3.8.0 > > > As [KIP-477: Add PATCH method for connector config in Connect REST > API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API] > suggests, we should introduce the PATCH method for connector configuration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1596716965 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -193,11 +193,12 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } pollTimer.update(currentTimeMs); if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { -logger.warn("Consumer poll timeout has expired. This means the time between " + +logger.warn("Consumer poll timeout has expired, exceeded by {} ms. This means the time between " + Review Comment: Hey, good point, it would actually take this a step further, where indeed should be more useful. As @AndrewJSchofield pointed, the HB manager will notice sooner in practice (even sooner than the HB interval), but we do know when the next poll happens, so can definitely get a more accurate exceed time (in-between calls to poll, which translates to poll events handled in this same manager). On it...thanks for the comments! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]
lianetm commented on code in PR #15909: URL: https://github.com/apache/kafka/pull/15909#discussion_r1596716965 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -193,11 +193,12 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } pollTimer.update(currentTimeMs); if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { -logger.warn("Consumer poll timeout has expired. This means the time between " + +logger.warn("Consumer poll timeout has expired, exceeded by {} ms. This means the time between " + Review Comment: Hey, good point, it would actually take this a step further, where indeed should be more useful. As @AndrewJSchofield pointed, the HB manager will notice sooner in practice (even sooner than the HB interval), but we do know when the next poll happens, so can definitely get a more accurate exceed time (in-between calls to poll, which translate to poll events handled in this same manager). On it...thanks for the comments! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]
jeqo commented on PR #15893: URL: https://github.com/apache/kafka/pull/15893#issuecomment-2104551065 @C0urante thanks! Agree, we should leave this change of behavior out of the scope of this PR/KIP. I have returned to the previous behavior, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13473) Log cleaner Dynamic configs aren't applied after a restart
[ https://issues.apache.org/jira/browse/KAFKA-13473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeel Jotaniya reassigned KAFKA-13473: - Assignee: Jeel Jotaniya > Log cleaner Dynamic configs aren't applied after a restart > -- > > Key: KAFKA-13473 > URL: https://issues.apache.org/jira/browse/KAFKA-13473 > Project: Kafka > Issue Type: Bug > Components: config, core >Affects Versions: 2.8.1 >Reporter: Tim Patterson >Assignee: Jeel Jotaniya >Priority: Minor > > Upon restarting kafka, dynamically configured log cleaner configs aren't > picked up and applied. > > Here are some logs from a local kafka when I up the threads to 2 using the > kafka-config tool - Noting the last 2 lines where it starts up 2 log cleaner > threads. > > {code:java} > [2021-11-23 21:09:50,044] INFO [Admin Manager on Broker 1001]: Updating > brokers with new configuration : log.cleaner.threads -> 2 > (kafka.server.ZkAdminManager) > [2021-11-23 21:09:50,092] INFO Processing override for entityPath: > brokers/ with config: HashMap(log.cleaner.threads -> 2) > (kafka.server.DynamicConfigManager) log.cleaner.threads = 2 > [2021-11-23 21:09:50,113] INFO Shutting down the log cleaner. > (kafka.log.LogCleaner) > [2021-11-23 21:09:50,114] INFO [kafka-log-cleaner-thread-0]: Shutting down > (kafka.log.LogCleaner) > [2021-11-23 21:09:50,116] INFO [kafka-log-cleaner-thread-0]: Stopped > (kafka.log.LogCleaner) > [2021-11-23 21:09:50,116] INFO [kafka-log-cleaner-thread-0]: Shutdown > completed (kafka.log.LogCleaner) > [2021-11-23 21:09:50,119] INFO Starting the log cleaner (kafka.log.LogCleaner) > [2021-11-23 21:09:50,178] INFO [kafka-log-cleaner-thread-0]: Starting > (kafka.log.LogCleaner) > [2021-11-23 21:09:50,181] INFO [kafka-log-cleaner-thread-1]: Starting > (kafka.log.LogCleaner){code} > And now after a restart, at no point does it ever start 2 threads, even > though it clearly knows about the configs > > {code:java} > [2021-11-23 21:10:46,659] INFO Starting the log cleaner (kafka.log.LogCleaner) > [2021-11-23 21:10:46,723] INFO [kafka-log-cleaner-thread-0]: Starting > (kafka.log.LogCleaner) > [2021-11-23 21:10:48,124] INFO Processing override for entityPath: > brokers/ with config: HashMap(log.cleaner.threads -> 2) > (kafka.server.DynamicConfigManager) log.cleaner.backoff.ms = 15000 > log.cleaner.dedupe.buffer.size = 15000 log.cleaner.delete.retention.ms = > 8640 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 > log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = > 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = > 9223372036854775807 log.cleaner.min.cleanable.ratio = 0.5 > log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 2{code} > > When investigating from the kafka config tool all looks well. > > {code:java} > kafka-configs --bootstrap-server $BROKER_URL --entity-type brokers > --entity-default --describe --all | grep log.cleaner.threads > log.cleaner.threads=2 sensitive=false > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:log.cleaner.threads=2}{code} > > But if you try change the config you soon find out that all is not well(note > here it mentions the current value is 1 in the validation message) > > {code:java} > kafka-configs --bootstrap-server $BROKER_URL --entity-type brokers > --entity-default --alter --add-config log.cleaner.threads=3 Error while > executing config command with args '--bootstrap-server profile_kafka:9093 > --entity-type brokers --entity-default --alter --add-config > log.cleaner.threads=3' java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidRequestException: Invalid config value > for resource ConfigResource(type=BROKER, name=''): Invalid value > org.apache.kafka.common.config.ConfigException: Log cleaner threads cannot be > increased to more than double the current value 1 for configuration Invalid > dynamic configuration{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]
chia7712 commented on PR #15884: URL: https://github.com/apache/kafka/pull/15884#issuecomment-2104544789 @KevinZTW Please fix the build error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]
chia7712 opened a new pull request, #15917: URL: https://github.com/apache/kafka/pull/15917 This is the first PR used to migrate tests of storage to new test infra. With the new test infra, we can make those tests run on either zk or kraft cluster easily. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16654:Refactor kafka.test.annotation.Type and ClusterTestExtensions [kafka]
TaiJuWu opened a new pull request, #15916: URL: https://github.com/apache/kafka/pull/15916 *More detailed description of your change, As title. *Summary of testing strategy (including rationale) local test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1596621705 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java: ## @@ -76,7 +76,7 @@ public void assertAtLeastNumWorkersAreUp(int numWorkers, String detailMessage) t } /** - * Assert that at least the requested number of workers are up and running. + * Assert that the exact number of workers are up and running. Review Comment: I tokk the liberty and edited this comment in this PR. Hope that's ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]
KevinZTW commented on code in PR #15884: URL: https://github.com/apache/kafka/pull/15884#discussion_r1596572793 ## tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java: ## @@ -47,22 +47,16 @@ @ClusterTestDefaults(clusterType = Type.KRAFT) Review Comment: oh you are right, I didn't notice that part. Just remove it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
nikramakrishnan commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2104320732 Given that we are marking the latest version as unstable in ListOffsetsResponse, it would make sense to not bump the IBP now as @clolov suggests, and only bump it when ListOffsetsResponse v9 is ready to be marked as stable. Doing this allows us to test v9 with `unstable.api.versions.enable`, and brokers and clients on IV0 will continue to work with v8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]
soarez merged PR #15886: URL: https://github.com/apache/kafka/pull/15886 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]
clolov commented on PR #15673: URL: https://github.com/apache/kafka/pull/15673#issuecomment-2104306940 Heya @junrao after inspecting some of the tests I think I have misunderstood how marking an API version as unstable works. From a discussion with Nikhil I think we can only do one of two things - we can either keep the IBP and mark the new version as unstable or we can bump the IBP. Otherwise what happens is that the broker treats the version as non-existent while clients don't respect the configuration and still send the new version. Is this your understanding as well? If it is, then I believe I will just mark the version as unstable while I make the client changes and then I will mark it as stable and bump the IBP all in one commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
chia7712 commented on code in PR #15911: URL: https://github.com/apache/kafka/pull/15911#discussion_r1596516913 ## storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java: ## @@ -49,9 +45,10 @@ public class RemoteLogMetadataManagerTest { private final Time time = new MockTime(1); -@ParameterizedTest(name = "remoteLogMetadataManager = {0}") -@MethodSource("remoteLogMetadataManagers") -public void testFetchSegments(RemoteLogMetadataManager remoteLogMetadataManager) throws Exception { +private RemoteLogMetadataManager remoteLogMetadataManager = new TopicBasedRemoteLogMetadataManagerWrapperWithHarness(); + +@Test +public void testFetchSegments() throws Exception { try { Review Comment: This can be addressed as follow-up I think :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]
chia7712 commented on PR #15911: URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104271236 QA is re-triggered, and I will merge it if QA does not show the related error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496660 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ## @@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() { final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology)); final String msg = se.getMessage(); -assertTrue("Error about class cast with serdes", msg.contains("StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); Review Comment: recovered test ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.StateSerdes; + + +public class StoreSerdeInitializer { +static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, + final String changelogTopic, final Serde keySerde, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496472 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); Review Comment: added original message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596492735 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); Review Comment: Nice catch ! Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Increasing timeout for wait_for_loggers to 30s [kafka]
vamossagar12 opened a new pull request, #15915: URL: https://github.com/apache/kafka/pull/15915 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on PR #15305: URL: https://github.com/apache/kafka/pull/15305#issuecomment-2104218899 Thanks @gharris1727 , I have made the changes as you suggested. I also modified the warning line that is printed based on the tick thread stage that is presented to the coordinator and also added a test to verify the same. LMKWYT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]
vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1596476664 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ## @@ -267,6 +267,18 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } +@Override +protected void handlePollTimeoutExpiry() { Review Comment: Thanks Greg, I think that makes sense. I have extended the `WorkerRebalanceListener` to add another hook upon poll timeout expiry which the coordinator invokes. This way, it is able to access the tick thread's stage which was being executed at that point of time. I am just thinking, could there be race conditions where what we get in the tick thread stage might not always reflect the point where tick thread is blocked (like it moved on by the time we invoke this or becomes null), but I guess it should be ok because this can be treated as best effort? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Upgrade scala-logging to 3.9.5 [kafka]
viktorsomogyi commented on PR #15914: URL: https://github.com/apache/kafka/pull/15914#issuecomment-2104207428 @ijuma would you please review this small PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org