Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1597349032


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends 
AbstractApiVersionsRequestTest(cluster) {

Review Comment:
   this test has redundant `ClusterTestDefaults` as the changed value is equal 
to default value.



##
core/src/test/java/kafka/test/ClusterConfig.java:
##
@@ -164,7 +166,7 @@ public Map nameTags() {
 
 public static Builder defaultBuilder() {
 return new Builder()
-.setType(Type.ZK)
+.setTypes(Collections.singleton(Type.ZK))

Review Comment:
   We have to align the default value with `ClusterTestDefaults`, right?



##
core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala:
##
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 
 @Timeout(120)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@ClusterTestDefaults(clusterTypes = Array(Type.KRAFT), brokers = 1)

Review Comment:
   please remove `brokers = 1`



##
core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala:
##
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(clusterType = Type.ALL)
+@ClusterTestDefaults()

Review Comment:
   ditto



##
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##
@@ -49,7 +48,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL)
+@ClusterTestDefaults()

Review Comment:
   please remove it



##
core/src/test/java/kafka/test/annotation/ClusterTestDefaults.java:
##
@@ -35,7 +35,7 @@
 @Target({TYPE})
 @Retention(RUNTIME)
 public @interface ClusterTestDefaults {
-Type clusterType() default Type.ZK;
+Type[] clusterTypes() default {Type.ZK, Type.KRAFT, Type.CO_KRAFT};

Review Comment:
   Maybe it should be renamed to `types` instead of `clusterTypes`. We do have 
a `ClusterType` in testing :)



##
core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java:
##
@@ -44,7 +43,7 @@
 
 @SuppressWarnings("dontUseSystemExit")
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ALL)
+@ClusterTestDefaults()

Review Comment:
   This is redundant if you don't define any values



##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -92,7 +91,7 @@ public void 
testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
 }
 
 @ClusterTests({
-@ClusterTest(clusterType = Type.ZK)
+@ClusterTest(clusterTypes = {Type.ZK})

Review Comment:
   Could you remove redundant `ClusterTests`?



##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -78,8 +78,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) {
 }
 
 @ClusterTests({
-@ClusterTest(clusterType = Type.ZK),
-@ClusterTest(clusterType = Type.KRAFT)
+@ClusterTest(clusterTypes = {Type.ZK, Type.KRAFT}),

Review Comment:
   ditt



##
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##
@@ -78,8 +78,7 @@ public ConfigCommandIntegrationTest(ClusterInstance cluster) {
 }

Review Comment:
   This test has redundant `ClusterTestDefaults`. Could you please remove it 
also?



##
tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java:
##
@@ -73,7 +73,7 @@ public void testDescribeQuorumReplicationSuccessful() throws 
InterruptedExceptio
 );
 
 List outputs = 
stream(describeOutput.split("\n")).skip(1).collect(Collectors.toList());
-if (cluster.config().clusterType() == Type.CO_KRAFT)
+if (cluster.config().clusterTypes().contains(Type.CO_KRAFT))

Review Comment:
   Could you add a new method to expose the `Type`? Otherwise, this check is 
not accurate since users can set multi-types in `ClusterConfig`



##
tools/src/test/java/org/apache/kafka/tools/MetadataQuorumCommandTest.java:
##
@@ -86,7 +86,7 @@ public void testDescribeQuorumReplicationSuccessful() throws 
InterruptedExceptio
 assertEquals(cluster.config().numControllers() - 1, 
outputs.stream().filter(o -> followerPattern.matcher(o).find()).count());
 
 Pattern observerPattern = 
Pattern.compile("\\d+\\s+\\d+\\s+\\d+\\s+[\\dmsago\\s]+-?[\\dmsago\\s]+Observer\\s*");
-if (cluster.config().clusterType() == Type.CO_KRAFT)
+if (cluster.config().clusterTypes().contains(Type.CO_KRAFT))

Review Comment:
   ditto



##

Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on PR #15917:
URL: https://github.com/apache/kafka/pull/15917#issuecomment-2105525456

   @kamalcph thanks for all your reviews. I have addressed them in 
https://github.com/apache/kafka/pull/15917/commits/ff2c163633b341a6db9b1d9d838cec350a41ac15


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 2) Implement ApplicationState and KafkaStreamsState [kafka]

2024-05-10 Thread via GitHub


ableegoldman commented on code in PR #15920:
URL: https://github.com/apache/kafka/pull/15920#discussion_r1597343428


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -459,6 +468,38 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 }
 }
 
+private ApplicationState getApplicationState(

Review Comment:
   nit: we don't use "get" in Streams getter names. I guess this isn't exactly 
a pure getter, but still. On that note, perhaps a better name would be 
`buildApplicationState`? 樂 
   
   Also: even though it's all internal, I've been on a crusade to get everyone 
to write javadocs for methods in the StreamsPartitionAssignor with at least a 
brief explanation of what it does.
   
   It's just a super complicated class that does a lot and often mutates things 
in a way that isn't obvious, so every little bit of documentation helps



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 // compute the assignment of tasks to threads within each client 
and build the final group assignment
 
+getApplicationState(

Review Comment:
   I know we're just throwing away the return value for now but I'd still do 
this:
   ```suggestion
   final ApplicationState applicationState = getApplicationState(
   ```
   Otherwise it kind of seems like this method is supposed to be mutating the 
input parameters (many of the StreamsPartitionAssignor methods work this way so 
it's good to distinguish when we're just building something vs operating on the 
passed in structures)



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java:
##
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.assignment;
 
-import org.apache.kafka.common.protocol.types.Field.UUID;
+import java.util.UUID;

Review Comment:
   good catch!



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##
@@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
 // compute the assignment of tasks to threads within each client 
and build the final group assignment

Review Comment:
   nit: this comment should stay above the `#computeNewAssignment` call



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+
+public class ApplicationStateImpl implements ApplicationState {
+
+private final AssignmentConfigs assignmentConfigs;
+private final Set statelessTasks;
+private final Set statefulTasks;
+private final Map kafkaStreamsStates;
+
+public ApplicationStateImpl(
+final AssignmentConfigs assignmentConfigs,
+final Map kafkaStreamsStates,
+final Set statefulTasks,
+final Set statelessTasks
+) {

Review Comment:
   KafkaStreams formatting for long signatures is (unfortunately) done like 
this :
   
   ```suggestion
   public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
   final Map kafkaStreamsStates,
   final Set 
statefulTasks,
   final Set 
statelessTasks) {
   

Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]

2024-05-10 Thread via GitHub


chia7712 merged PR #15904:
URL: https://github.com/apache/kafka/pull/15904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on PR #15904:
URL: https://github.com/apache/kafka/pull/15904#issuecomment-2105442338

   |test|jira|
   |-|-|
   |testReplicateFromLatest|https://issues.apache.org/jira/browse/KAFKA-16383|
   
|testTaskRequestWithOldStartMsGetsUpdated|https://issues.apache.org/jira/browse/KAFKA-16136|
   
|testIndexFileAlreadyExistOnDiskButNotInCache|https://issues.apache.org/jira/browse/KAFKA-16704|
   |testMigrateTopicDeletions|https://issues.apache.org/jira/browse/KAFKA-16045|
   
|testReplicateSourceDefault|https://issues.apache.org/jira/browse/KAFKA-15292|
   |testFenceMultipleBrokers|https://issues.apache.org/jira/browse/KAFKA-16634|
   |testUnregisterBroker|https://issues.apache.org/jira/browse/KAFKA-13966|
   |testSyncTopicConfigs|https://issues.apache.org/jira/browse/KAFKA-15945|
   |testSeparateOffsetsTopic|https://issues.apache.org/jira/browse/KAFKA-14089|


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16704) Fix flaky kafka.log.remote.RemoteIndexCacheTest#testIndexFileAlreadyExistOnDiskButNotInCache

2024-05-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16704:
--

 Summary: Fix flaky  
kafka.log.remote.RemoteIndexCacheTest#testIndexFileAlreadyExistOnDiskButNotInCache
 Key: KAFKA-16704
 URL: https://issues.apache.org/jira/browse/KAFKA-16704
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


java.io.UncheckedIOException: java.nio.file.NoSuchFileException: 
/tmp/kafka-RemoteIndexCacheTest3690189103734187552/R68MBnutRfmqJY66XXFoOA:foo-0/remote-log-index-cache/2147584984_Ma8JCqucS7mqKIHfSSDeow.txnindex.deleted
 at 
java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
 at java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
 at java.base/java.util.Iterator.forEachRemaining(Iterator.java:132)
 at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
 at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
 at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
 at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
 at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
 at 
kafka.log.remote.RemoteIndexCacheTest.renameRemoteCacheIndexFileFromDisk$1(RemoteIndexCacheTest.scala:832)
 at 
kafka.log.remote.RemoteIndexCacheTest.testIndexFileAlreadyExistOnDiskButNotInCache(RemoteIndexCacheTest.scala:851)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base/java.lang.reflect.Method.invoke(Method.java:566)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-10 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1597305375


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}
+
+public short featureLevel() {
+return featureLevel;
+}
+
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+// version 1 depends on metadata.version 3.3-IVO
+if (featureLevel >= 1 && 
metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0))
+throw new IllegalArgumentException(FEATURE_NAME + " could not be 
set to " + featureLevel +
+" because it depends on metadata.version=14 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+}
+
+public static TestFeatureVersion metadataVersionMapping(MetadataVersion 
metadataVersion) {
+if (metadataVersion.isLessThan(MetadataVersion.IBP_3_8_IV0)) {

Review Comment:
   This will be simplified when I fix the above. Potentially we can even 
include the 3.3 check in the FeatureVersion class since it will apply to all 
features and doesn't change on a per feature basis. Stay tuned for some 
cleanups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-10 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1597304812


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.List;
+
+public enum TestFeatureVersion implements FeatureVersion {
+
+TEST_0(0),
+TEST_1(1),
+TEST_2(2);
+
+private short featureLevel;
+
+public static final String FEATURE_NAME = "test.feature.version";
+public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+
+TestFeatureVersion(int featureLevel) {
+this.featureLevel = (short) featureLevel;
+}

Review Comment:
   I have a plan for this but didn't quite get to it. Maybe we want to do this 
and maybe we don't. 
   
   One idea is to have all features have a standard pattern/class for the 
fields. This would contain the metadata version mapping and a Map of the required other features. Not sure if we will need any more 
complicated validation logic than that. We could leave the opportunity to have 
a more complicated validate method if we choose. 
   
   We can also show an example implementation in the TestFeatureVersion and 
folks can choose to replicate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-10 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1597303887


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -258,6 +259,13 @@ public enum MetadataVersion {
 this.didMetadataChange = didMetadataChange;
 }
 
+public String featureName() {
+return FEATURE_NAME;
+}
+
+public void validateVersion(MetadataVersion metadataVersion, 
List features) {
+}

Review Comment:
   I removed this for now, we can add it back later if we want MetadataVersion 
to implement our interface. I think there are pros and cons for doing so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16703: Close serverChannel in SocketServer if unable to bind to a port [kafka]

2024-05-10 Thread via GitHub


gharris1727 opened a new pull request, #15923:
URL: https://github.com/apache/kafka/pull/15923

   This was showing up as a leaked socket in 
KafkaServerTest#testListenerPortAlreadyInUse.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16703) SocketServer leaks ServerSocketChannel when port is already in-use

2024-05-10 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16703:
---

 Summary: SocketServer leaks ServerSocketChannel when port is 
already in-use
 Key: KAFKA-16703
 URL: https://issues.apache.org/jira/browse/KAFKA-16703
 Project: Kafka
  Issue Type: Test
Reporter: Greg Harris
Assignee: Greg Harris


The SocketServer#openServerSocket method creates a serverSocket, and then 
attempts to bind it to the selected port. If the port is already in-use, an 
exception is propagated and the serverSocket is never closed.

This causes KafkaServerTest#testListenerPortAlreadyInUse to leak sockets.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16702: Fix producer leaks in KafkaLog4jAppenderTest [kafka]

2024-05-10 Thread via GitHub


gharris1727 opened a new pull request, #15922:
URL: https://github.com/apache/kafka/pull/15922

   The tests `testRealProducerConfigWithSyncSendShouldNotThrowException` and 
`testRealProducerConfigWithSyncSendAndNotIgnoringExceptionsShouldThrowException`
 create real producer instances, which are leaked when the test exits.
   
   Instead, each test should be followed by a cleanup operation where the 
registered appender is removed and closed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16702) KafkaLog4jAppenderTest leaks producer instances

2024-05-10 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16702:
---

 Summary: KafkaLog4jAppenderTest leaks producer instances
 Key: KAFKA-16702
 URL: https://issues.apache.org/jira/browse/KAFKA-16702
 Project: Kafka
  Issue Type: Test
Affects Versions: 3.8.0
Reporter: Greg Harris
Assignee: Greg Harris


The KafkaLog4jAppenderTest has the method getLog4jConfigWithRealProducer which 
naturally creates a real producer. This appender is never cleaned up within the 
test, so the producer is leaked.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Add classic member session timeout to ClassicMemberMetadata [kafka]

2024-05-10 Thread via GitHub


dongnuo123 opened a new pull request, #15921:
URL: https://github.com/apache/kafka/pull/15921

   The heartbeat api to the consumer group with classic protocol members 
schedules the session timeout. At present, there's no way to get the classic 
member session timeout in heartbeat to consumer group.
   
   This patch stores the session timeout into the ClassicMemberMetadata in 
ConsumerGroupMemberMetadataValue and update it when it's provided in the join 
request.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16513) Allow WriteTxnMarkers API with Alter Cluster Permission

2024-05-10 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-16513.

Resolution: Fixed

> Allow WriteTxnMarkers API with Alter Cluster Permission
> ---
>
> Key: KAFKA-16513
> URL: https://issues.apache.org/jira/browse/KAFKA-16513
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Nikhil Ramakrishnan
>Assignee: Siddharth Yagnik
>Priority: Minor
>  Labels: KIP-1037
> Fix For: 3.8.0
>
>
> We should allow WriteTxnMarkers API with Alter Cluster Permission because it 
> can invoked externally by a Kafka AdminClient. Such usage is more aligned 
> with the Alter permission on the Cluster resource, which includes other 
> administrative actions invoked from the Kafka AdminClient.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Allowing WriteTxnMarkers API to run with AlterCluster permissions [kafka]

2024-05-10 Thread via GitHub


jolshan merged PR #15837:
URL: https://github.com/apache/kafka/pull/15837


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-10 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1597073119


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
 * Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
+* 1. If the message offset is less than the log-start-offset (or) 
local-log-start-offset, then it returns the
+   * message-only metadata.
+* 2. If the message offset is beyond the log-end-offset, then it returns 
the message-only metadata.
+* 3. For all other cases, it returns the offset metadata from the log.
 */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
-checkLogStartOffset(offset)
-localLog.convertToOffsetMetadataOrThrow(offset)
+  private[log] def convertToOffsetMetadata(offset: Long): LogOffsetMetadata = {

Review Comment:
   convertToOffsetMetadata => maybeConvertToOffsetMetadata ?



##
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##
@@ -164,18 +169,71 @@ class DelayedFetchTest {
 assertTrue(delayedFetch.tryComplete())
 assertTrue(delayedFetch.isCompleted)
 assertTrue(fetchResultOpt.isDefined)
+
+val fetchResult = fetchResultOpt.get
+assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+val fetchOffset = 450L
+val logStartOffset = 5L
+val currentLeaderEpoch = Optional.of[Integer](10)
+val replicaId = 1
+
+val fetchStatus = FetchPartitionStatus(
+  startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+  fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+var fetchResultOpt: Option[FetchPartitionData] = None
+def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+  fetchResultOpt = Some(responses.head._2)
+}
+
+val delayedFetch = new DelayedFetch(
+  params = fetchParams,
+  fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+  replicaManager = replicaManager,
+  quota = replicaQuota,
+  responseCallback = callback
+)
+
+val partition: Partition = mock(classOf[Partition])
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+// high-watermark is lesser than the log-start-offset
+val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+when(partition.fetchOffsetSnapshot(
+  currentLeaderEpoch,
+  fetchOnlyFromLeader = true))
+  .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+val expected = minBytes == 1
+assertEquals(expected, delayedFetch.tryComplete())
+assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   This exposes an issue in delayedFetch. If HWM is less than fetchOffset, we 
haven't gained any bytes. So, we shouldn't complete the delayedFetch 
immediately.



##
storage/src/test/java/org/apache/kafka/storage/internals/log/LogOffsetMetadataTest.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.kafka.storage.internals.log.LogOffsetMetadata.UNKNOWN_OFFSET_METADATA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static 

[jira] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-10 Thread Philip Nee (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-16687 ]


Philip Nee deleted comment on KAFKA-16687:


was (Author: JIRAUSER283568):
[~fortherightous] - Not sure if this is what you are seeing:

 
{code:java}
Native Memory Tracking (reserved=3835KB +481KB, committed=3835KB +481KB)
  55                             (malloc=458KB +234KB #6808 +3631)
  56                             (tracking overhead=3376KB +247KB) {code}
and

 
{code:java}
 290 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
 291 [0x000147f01034]
 292                              (malloc=43KB type=Other +43KB #4 +4)
 293
 294 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
 295 [0x00014753d8ac]
 296                              (malloc=996KB type=Other -1249KB #94 -102) 
{code}

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15045: (KIP-924): Implement ApplicationState and KafkaStreamsState [kafka]

2024-05-10 Thread via GitHub


apourchet opened a new pull request, #15920:
URL: https://github.com/apache/kafka/pull/15920

   This PR implements read-only container classes for ApplicationState and 
KafkaStreamsState, and initializes those within
   StreamsPartitionAssignor#assign.
   
   New internal methods were also added to the ClientState to easily pass this 
data through to the KafkaStreamsState.
   
   One test was added to check the lag sorting within the implementation of 
KafkaStreamsState, which is the counterpart to the test that existed for the 
ClientState class. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on PR #15909:
URL: https://github.com/apache/kafka/pull/15909#issuecomment-2105274302

   Thanks all for the helpful feedback! Let's wait for the build and we should 
be good @mjsax


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException

2024-05-10 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845497#comment-17845497
 ] 

Justine Olshan commented on KAFKA-16699:


Yay yay! I'm happy this is getting fixed :) 

> Have Streams treat InvalidPidMappingException like a ProducerFencedException
> 
>
> Key: KAFKA-16699
> URL: https://issues.apache.org/jira/browse/KAFKA-16699
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>
> KStreams is able to handle the ProducerFenced (among other errors) cleanly. 
> It does this by closing the task dirty and triggering a rebalance amongst the 
> worker threads to rejoin the group. The producer is also recreated. Due to 
> how streams works (writing to and reading from various topics), the 
> application is able to figure out the last thing the fenced producer 
> completed and continue from there.
> KStreams EOS V2 also trusts that any open transaction (including those whose 
> producer is fenced) will be aborted by the server. This is a key factor in 
> how it is able to operate. In EOS V1, the new InitProducerId fences and 
> aborts the previous transaction. In either case, we are able to reason about 
> the last valid state from the fenced producer and how to proceed.
> h2. InvalidPidMappingException ≈ ProducerFenced
> I argue that InvalidPidMappingException can be handled in the same way. Let 
> me explain why.
> There are two cases we see this error:
>  # 
>  
>  {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
>  # 
>  
>  {{if (txnMetadata.producerId != producerId)  
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> h3. Case 1
> We are missing a value in the transactional state map for the transactional 
> ID. Under normal operations, this is only possible when the transactional ID 
> expires via the mechanism described above after 
> {{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
> state that needs to be reconciled. It is safe to just rebalance and rejoin 
> the group with a new producer. We probably don’t even need to close the task 
> dirty, but it doesn’t hurt to do so.
> h3. Case 2
> This is a bit more interesting. It says that we have transactional state, but 
> the producer ID in the request does not match the producer ID associated with 
> the transactional ID on the broker. How can this happen?
> It is possible that a new producer instance B with the same transactional ID 
> was created after the transactional state expired for instance A. Given there 
> is no state on the server when B joins, it will get a totally new producer 
> ID. If the original producer A comes back, it will have state for this 
> transactional ID but the wrong producer ID.
> In this case, the old producer ID is fenced, it’s just the normal epoch-based 
> fencing logic doesn’t apply. We can treat it the same however.
> h2. Summary
> As described in the cases above, any time we encounter the InvalidPidMapping 
> during normal operation, the previous producer was either finished with its 
> operations or was fenced. Thus, it is safe to close the dirty and rebalance + 
> rejoin the group just as we do with the ProducerFenced exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15804) Broker leaks ServerSocketChannel when exception is thrown from ZkConfigManager during startup

2024-05-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-15804:
---

Fix Version/s: 3.8.0
 Assignee: Greg Harris
   Resolution: Fixed

> Broker leaks ServerSocketChannel when exception is thrown from 
> ZkConfigManager during startup
> -
>
> Key: KAFKA-15804
> URL: https://issues.apache.org/jira/browse/KAFKA-15804
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 3.8.0
>
>
> This exception is thrown during the 
> RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic
>  test in zk mode:
> {noformat}
> org.apache.kafka.common.config.ConfigException: You have to delete all topics 
> with the property remote.storage.enable=true before disabling tiered storage 
> cluster-wide
> at 
> org.apache.kafka.storage.internals.log.LogConfig.validateRemoteStorageOnlyIfSystemEnabled(LogConfig.java:566)
>         at kafka.log.LogManager.updateTopicConfig(LogManager.scala:956)
>         at 
> kafka.server.TopicConfigHandler.updateLogConfig(ConfigHandler.scala:73)
> at 
> kafka.server.TopicConfigHandler.processConfigChanges(ConfigHandler.scala:94)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$4(ZkConfigManager.scala:176)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$4$adapted(ZkConfigManager.scala:175)
> at scala.collection.immutable.Map$Map2.foreach(Map.scala:360)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$1(ZkConfigManager.scala:175)
> at 
> kafka.server.ZkConfigManager.$anonfun$startup$1$adapted(ZkConfigManager.scala:166)
> at scala.collection.immutable.HashMap.foreach(HashMap.scala:1115)
> at kafka.server.ZkConfigManager.startup(ZkConfigManager.scala:166)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:575)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
> at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
> at scala.collection.immutable.List.foreach(List.scala:333)
> at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
> at 
> kafka.integration.KafkaServerTestHarness.recreateBrokers(KafkaServerTestHarness.scala:146)
> at 
> kafka.admin.RemoteTopicCrudTest.$anonfun$testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic$1(RemoteTopicCrudTest.scala:319)
> at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:53)
> at 
> org.junit.jupiter.api.AssertThrows.assertThrows(AssertThrows.java:35)
> at org.junit.jupiter.api.Assertions.assertThrows(Assertions.java:3111)
> at 
> kafka.admin.RemoteTopicCrudTest.testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(RemoteTopicCrudTest.scala:319){noformat}
> This leak only occurs for this one test in the RemoteTopicCrudTest; all other 
> tests including the kraft-mode version do not exhibit a leaked socket.
> Here is where the ServerSocket is instantiated:
> {noformat}
> at 
> java.base/java.nio.channels.ServerSocketChannel.open(ServerSocketChannel.java:113)
>         at kafka.network.Acceptor.openServerSocket(SocketServer.scala:724)
>         at kafka.network.Acceptor.(SocketServer.scala:608)
>         at kafka.network.DataPlaneAcceptor.(SocketServer.scala:454)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptor(SocketServer.scala:270)
>         at 
> kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:249)
>         at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)
>         at 
> kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)
>         at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
>         at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>         at kafka.network.SocketServer.(SocketServer.scala:175)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:344)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1(KafkaServerTestHarness.scala:356)
>         at 
> kafka.integration.KafkaServerTestHarness.$anonfun$createBrokers$1$adapted(KafkaServerTestHarness.scala:352)
>         at scala.collection.immutable.List.foreach(List.scala:333)
>         at 
> kafka.integration.KafkaServerTestHarness.createBrokers(KafkaServerTestHarness.scala:352)
>         

Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2024-05-10 Thread via GitHub


gharris1727 merged PR #14729:
URL: https://github.com/apache/kafka/pull/14729


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1597219039


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -193,11 +193,8 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 pollTimer.update(currentTimeMs);
 if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
-logger.warn("Consumer poll timeout has expired. This means the 
time between " +

Review Comment:
   Done, I did like the simplified log but totally agree with your points, 
both. I've been myself pushing for avoiding changing the existing logs content 
when possible because I've also heard about customers basing their apps on 
them. Also agree about the more complete output on the case of not hitting the 
next poll in a sensible time. 
   
   So left the log here unchanged (and simplified the other just to not repeat 
ourselves on the 2 logs). So in the common case that we end up with the 2 log 
lines, it's just a first one about the situation when it happens, and the 2nd 
one with the approximate exceeded time when we have the most accurate info. 
Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15804: Close SocketServer channels when calling shutdown before enableRequestProcessing [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on PR #14729:
URL: https://github.com/apache/kafka/pull/14729#issuecomment-2105256727

   I noticed some instability in th SocketServerTest suite locally, but it 
doesn't appear to be introduced by this change. It appears on trunk (and 3.7, 
3.6, 3.5) and coincides with JDK >= 17.
   
   I ran this test suite locally with JDK 11 and got consistent passes, and it 
passes in CI. The other failures in CI look unrelated, and pass locally.
   
   I think i'm comfortable merging this PR at this time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1597219039


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -193,11 +193,8 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 pollTimer.update(currentTimeMs);
 if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
-logger.warn("Consumer poll timeout has expired. This means the 
time between " +

Review Comment:
   Done, I did like the simplified log but totally agree with your points, 
both. I've been myself pushing for avoiding changing the existing logs content 
when possible because I've also heard about customers basing their apps on 
them. Also agree about the more complete output on the case of not hitting the 
next poll. 
   
   So left the log here unchanged (and simplified the other just to not repeat 
ourselves on the 2 logs). So in the common case that we end up with the 2 log 
lines, it's just a first one about the situation when it happens, and the 2nd 
one with the approximate exceeded time when we have the most accurate info. 
Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally

2024-05-10 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-16701:

Description: 
These tests are failing for me on a local development environment, but don't 
appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. 
I'm using an M1 Mac, so it is possible that either the Mac's linear port 
allocation, or a native implementation is impacting this.

closingChannelSendFailure()

 
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690)
at 
kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434)
at 
kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430)
at 
kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat}
closingChannelWithBufferedReceivesFailedSend()

 
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
at 
kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat}
closingChannelWithCompleteAndIncompleteBufferedReceives()
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
at 
kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511)
 {noformat}
remoteCloseWithBufferedReceives()
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
at 
kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat}

  was:
These tests are failing for me on a local development environment, but don't 
appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. 
I'm using an M1 Mac, so it is possible that either the Mac's linear port 
allocation, or a native implementation is impacting this.

closingChannelSendFailure()

 
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690)
at 
kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434)
at 
kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430)
at 
kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat}
closingChannelWithBufferedReceivesFailedSend()

 
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
at 

[jira] [Created] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally

2024-05-10 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16701:
---

 Summary: Some SocketServerTest buffered close tests flaky failing 
locally
 Key: KAFKA-16701
 URL: https://issues.apache.org/jira/browse/KAFKA-16701
 Project: Kafka
  Issue Type: Test
  Components: core, unit tests
Affects Versions: 3.7.0, 3.6.0, 3.5.0
Reporter: Greg Harris


These tests are failing for me on a local development environment, but don't 
appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. 
I'm using an M1 Mac, so it is possible that either the Mac's linear port 
allocation, or a native implementation is impacting this.

closingChannelSendFailure()

 
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690)
at 
kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434)
at 
kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430)
at 
kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat}
closingChannelWithBufferedReceivesFailedSend()

 
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
at 
kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat}
closingChannelWithCompleteAndIncompleteBufferedReceives()
{noformat}
java.lang.AssertionError: receiveRequest timed out at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
 at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
 at 
kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511)
{noformat}
remoteCloseWithBufferedReceives()
{noformat}
java.lang.AssertionError: receiveRequest timed out
at 
kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
at 
kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
at 
kafka.network.SocketServerTest.remoteCloseWithBufferedReceives(SocketServerTest.scala:1453){noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


ableegoldman commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1597202905


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -193,11 +193,8 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 pollTimer.update(currentTimeMs);
 if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
-logger.warn("Consumer poll timeout has expired. This means the 
time between " +

Review Comment:
   Can you actually leave this log untouched? On the one hand I kind of agree 
with this simplification, and logs are by no means a part of the public 
contract, but I know for a fact that some people have built observation tools 
and/or dashboards for things like rebalancing issues by searching for relevant 
log strings such as this one (I know because I built one myself a long time ago)
   
   I don't feel super strongly about this so  I won't push back if you'd prefer 
to clean it up, but imo it doesn't hurt to leave the log here as well
   
   Also: in some extreme cases, eg an infinite loop in a user's processing 
logic, the consumer might never return to call `poll` at all. In less extreme 
cases, eg some kind of long processing that takes on the order of minutes per 
record, it might be a very very long time before the consumer gets back to poll 
and logs the message you added. For the latter case, I think it would be 
valuable to keep this part about increasing the max.poll.interval or lowering 
the max.poll.records in the message we log here, when the max poll interval is 
first missed, so that users know what to do immediately and don't have to wait 
until they actually get through all 1000 records (or whatever max.poll.records 
is set to) and finally return to poll to see a hint about which configs to 
change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16700) Kafka Streams: possible message loss on KTable-KTable FK Left Join

2024-05-10 Thread Jira
Karsten Stöckmann created KAFKA-16700:
-

 Summary: Kafka Streams: possible message loss on KTable-KTable FK 
Left Join
 Key: KAFKA-16700
 URL: https://issues.apache.org/jira/browse/KAFKA-16700
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
 Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and 3 
controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka Operators
Reporter: Karsten Stöckmann


We are experiencing significant, yet intermittent / non-deterministic / 
unexplainable message loss on a Kafka Streams topology while performing a 
*KTable-KTable* {*}FK Left Join{*}.

Assume the following snippet:
{code:java}
streamsBuilder
.table(
folderTopicName,
Consumed.with(
folderKeySerde,
folderSerde))
.leftJoin(
agencies, // KTable
Folder::agencyIdValue,
AggregateFolder::new,
TableJoined.as("folder-to-agency"),
Materialized
.as("folder-to-agency-materialized")
.withKeySerde(folderKeySerde)
.withValueSerde(aggregateFolderSerde))
.leftJoin(
documents,
{code}
The setup is as follows:

A Debezium Connector for PostgreSQL streams database changes into various Kafka 
topics. A series of Quarkus Kafka Streams applications then performs 
aggregation operations on those topics to create index documents later to be 
sent into an OpenSearch system.

When firing up the Kafka Streams infrastructure to work on initially populated 
Kafka Topics (i.e. a snapshot of all relevant table data has been streamed to 
Kafka), the above shown KTable-KTable FK Left Join seems to produce message 
loss on the first of a series of FK Left Joins; the right hand 
{{KTable}} is consumed from an aggregated topic fed 
from another Kafka Streams topology / application.

On a (heavily reduced) test data set of 6828 messages in the 
{{folderTopicName}} Topic, we observe the following results:
 * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages
 * {{{}folder-to-agency-subscription-response{}}}: *3048* messages
 * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages
 * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages.

Telling from the nature of a (FK) Left Join, I'd expect all messages from the 
left hand topic should produce an aggregate even if no matching message is 
found in the right hand topic.

Message loss unpredictably varies across tests and seems not to be bound to 
specific keys or messages.

As it seems, this can only be observed when initially firing up the Streams 
infrastructure to process the message 'backlog' that had been snapshotted by 
Debezium. A manual snapshot triggered later (i.e. Streams applications already 
running) seems not to show this behaviour. Additionally, as of yet we observed 
this kind of message loss only when running multiple replicas of the affected 
application. When carrying out the tests with only one replica, everything 
seems to work as expected. We've tried to leverage 
{{group.initial.rebalance.delay.ms}} in order to rule out possible rebalancing 
issues, but to no avail.

Our Kafka configuration:
{code:yaml}
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
message.max.bytes: "20971520"
{code}
Our Kafka Streams application configuration:
{code:yaml}
kafka-streams.num.stream.threads: 5
kafka-streams.num.standby.replicas: 1
kafka-streams.auto.offset.reset: earliest
kafka-streams.cache.max.bytes.buffering: "20971520"
kafka-streams.commit.interval.ms: 100
kafka-streams.fetch.max.bytes: "10485760"
kafka-streams.max.request.size: "10485760"
kafka-streams.max.partition.fetch.bytes: "10485760"
kafka-streams.metadata.max.age.ms: 30
kafka-streams.statestore.cache.max.bytes: "20971520"
kafka-streams.topology.optimization: all
kafka-streams.processing.guarantee: exactly_once_v2

# Kafka Streams Intermediate Topics
kafka-streams.topic.compression.type: lz4
kafka-streams.topic.segment.ms: "4320" # 12h
kafka-streams.topic.max.compaction.lag.ms: "8640" # 24h
kafka-streams.topic.delete.retention.ms: "8640" # 24h

kafka-streams.producer.max.request.size: "20971520" # 20MiB
kafka-streams.producer.transaction.timeout.ms: 100 # Should match 
commit.interval.ms, set close to 100ms for exactly_once_v2

kafka-streams.consumer.group.instance.id: ${HOSTNAME}
kafka-streams.consumer.heartbeat.interval.ms: 100
kafka-streams.consumer.session.timeout.ms: 45000
{code}
All input (and aggregate) topics feature 15 partitions and share this 
configuration:
{code:yaml}

[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-10 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845464#comment-17845464
 ] 

Philip Nee commented on KAFKA-16687:


[~fortherightous] - Not sure if this is what you are seeing:

 
{code:java}
Native Memory Tracking (reserved=3835KB +481KB, committed=3835KB +481KB)
  55                             (malloc=458KB +234KB #6808 +3631)
  56                             (tracking overhead=3376KB +247KB) {code}
and

 
{code:java}
 290 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
 291 [0x000147f01034]
 292                              (malloc=43KB type=Other +43KB #4 +4)
 293
 294 [0x0001040af538] Unsafe_AllocateMemory0(JNIEnv_*, _jobject*, long)+0xcc
 295 [0x00014753d8ac]
 296                              (malloc=996KB type=Other -1249KB #94 -102) 
{code}

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]

2024-05-10 Thread via GitHub


C0urante merged PR #15906:
URL: https://github.com/apache/kafka/pull/15906


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]

2024-05-10 Thread via GitHub


C0urante commented on PR #15906:
URL: https://github.com/apache/kafka/pull/15906#issuecomment-2105098368

   CI has passed on at least one node and the changes are trivial; merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-10 Thread via GitHub


jsancio commented on PR #15825:
URL: https://github.com/apache/kafka/pull/15825#issuecomment-2105087021

   @kamalcph, looks like a bug to me. The predicate should be `if 
(!hwm.messageOffsetOnly)` or the if/else blocks should be swapped. I suspect 
that we haven't noticed this bug in the KRaft implementation 
(`KafkaRaftClient`) because kraft never looks at the segment and byte position 
for the HWM.
   
   If you are going to fix this code, do you mind adding a test for this case? 
Since `KafkaMetadataLog` calls `UnifiedLog.fetchOffsetSnapshot`, 
`hwm.messageOffsetOnly` should always be false.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


AndrewJSchofield commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1597080823


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) {
  * member to {@link MemberState#JOINING}, so that it rejoins the group.
  */
 public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);
 if (pollTimer.isExpired()) {

Review Comment:
   Yes, makes sense. When I was reviewing the previous iteration, I found 
myself looking within the Timer at the internal variables and then trying to 
figure out whether the derivation being performed was valid. Makes sense to do 
it within the Timer. Perfectly happy with 2 methods like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]

2024-05-10 Thread via GitHub


kamalcph commented on code in PR #15917:
URL: https://github.com/apache/kafka/pull/15917#discussion_r1597048065


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
+
+class RemoteLogMetadataManagerTestUtils {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class);
+
+static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
+static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
+static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L;
+
+static Builder builder() {
+return new Builder();
+}
+
+static class Builder {
+private String bootstrapServers;
+private Set topicIdPartitions = 
Collections.emptySet();
+private boolean startConsumerThread;
+private RemoteLogMetadataTopicPartitioner 
remoteLogMetadataTopicPartitioner;
+private Map overrideRemoteLogMetadataManagerProps = 
Collections.emptyMap();
+
+private Builder() {
+}
+
+public Builder bootstrapServers(String bootstrapServers) {
+this.bootstrapServers = Objects.requireNonNull(bootstrapServers);
+return this;
+}
+
+public Builder topicIdPartitions(Set 
topicIdPartitions) {
+this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions);
+return this;
+}
+
+public Builder startConsumerThread(boolean startConsumerThread) {
+this.startConsumerThread = startConsumerThread;
+return this;
+}
+
+public Builder 
remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner 
remoteLogMetadataTopicPartitioner) {
+this.remoteLogMetadataTopicPartitioner = 
Objects.requireNonNull(remoteLogMetadataTopicPartitioner);
+return this;
+}
+
+public Builder overrideRemoteLogMetadataManagerProps(Map overrideRemoteLogMetadataManagerProps) {
+this.overrideRemoteLogMetadataManagerProps = 
Objects.requireNonNull(overrideRemoteLogMetadataManagerProps);
+return this;
+}
+
+public TopicBasedRemoteLogMetadataManager build() {
+String logDir = 
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
+TopicBasedRemoteLogMetadataManager 
topicBasedRemoteLogMetadataManager = new 
TopicBasedRemoteLogMetadataManager(startConsumerThread);
+
+// Initialize TopicBasedRemoteLogMetadataManager.
+Map configs = new HashMap<>();
+configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+configs.put(BROKER_ID, 0);
+configs.put(LOG_DIR, logDir);
+configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 

Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-10 Thread via GitHub


junrao commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2104991266

   @clolov @nikramakrishnan : If an RPC is only used by the client, we don't 
need to bump up the IBP. However, ListOffsetRequest is used by both the client 
and the broker. If we don't bump the IBP, we can't test the logic for the new 
ListOffsetRequest on the broker, right?
   
   > Otherwise what happens is that the broker treats the version as 
non-existent while clients don't respect the configuration and still send the 
new version. 
   
   Hmm, normally, a client first sends an ApiVersionRequest to the broker to 
get exposed API versions. The broker decides whether to expose the latest 
version based on `unstable.api.versions.enable`. If the broker doesn't expose 
the latest version, the client shouldn't use it. Also, if somehow the client 
ignores this and indeed sends the new version, it seems that the broker will 
still take it. Could you explain the problem a bit more?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on PR #15909:
URL: https://github.com/apache/kafka/pull/15909#issuecomment-2104982949

   Just to clarify what we're getting here, related to @AndrewJSchofield 's 
very valid point. With this we get the time between internal poll events, which 
do not translate exactly to calls to consumer.poll depending on the situation. 
So the log here will be very helpful to tune the config in cases where the 
delay that led to leaving the group was due to the client app taking too long 
to process messages after a call to poll. It would be less accurate in cases 
where the delay is due to the fetch not getting messages for instance, since we 
internally generate more poll events while at it. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException

2024-05-10 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-16699:
--

Assignee: Walker Carlson

> Have Streams treat InvalidPidMappingException like a ProducerFencedException
> 
>
> Key: KAFKA-16699
> URL: https://issues.apache.org/jira/browse/KAFKA-16699
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>
> KStreams is able to handle the ProducerFenced (among other errors) cleanly. 
> It does this by closing the task dirty and triggering a rebalance amongst the 
> worker threads to rejoin the group. The producer is also recreated. Due to 
> how streams works (writing to and reading from various topics), the 
> application is able to figure out the last thing the fenced producer 
> completed and continue from there.
> KStreams EOS V2 also trusts that any open transaction (including those whose 
> producer is fenced) will be aborted by the server. This is a key factor in 
> how it is able to operate. In EOS V1, the new InitProducerId fences and 
> aborts the previous transaction. In either case, we are able to reason about 
> the last valid state from the fenced producer and how to proceed.
> h2. InvalidPidMappingException ≈ ProducerFenced
> I argue that InvalidPidMappingException can be handled in the same way. Let 
> me explain why.
> There are two cases we see this error:
>  # 
>  
>  {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
>  # 
>  
>  {{if (txnMetadata.producerId != producerId)  
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> h3. Case 1
> We are missing a value in the transactional state map for the transactional 
> ID. Under normal operations, this is only possible when the transactional ID 
> expires via the mechanism described above after 
> {{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
> state that needs to be reconciled. It is safe to just rebalance and rejoin 
> the group with a new producer. We probably don’t even need to close the task 
> dirty, but it doesn’t hurt to do so.
> h3. Case 2
> This is a bit more interesting. It says that we have transactional state, but 
> the producer ID in the request does not match the producer ID associated with 
> the transactional ID on the broker. How can this happen?
> It is possible that a new producer instance B with the same transactional ID 
> was created after the transactional state expired for instance A. Given there 
> is no state on the server when B joins, it will get a totally new producer 
> ID. If the original producer A comes back, it will have state for this 
> transactional ID but the wrong producer ID.
> In this case, the old producer ID is fenced, it’s just the normal epoch-based 
> fencing logic doesn’t apply. We can treat it the same however.
> h2. Summary
> As described in the cases above, any time we encounter the InvalidPidMapping 
> during normal operation, the previous producer was either finished with its 
> operations or was fenced. Thus, it is safe to close the dirty and rebalance + 
> rejoin the group just as we do with the ProducerFenced exception.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1597000322


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) {
  * member to {@link MemberState#JOINING}, so that it rejoins the group.
  */
 public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);
 if (pollTimer.isExpired()) {

Review Comment:
   agree, makes total sense, so moved the calculation to the timer, with an 
`isExpiredBy`. Small twist to what I understand you were suggesting, I kept the 
`isExpired` check, just to avoid having to deal with the logic of deducing if 
the timer is expired based on the `isExpiredBy` on the HBManager. Seems better 
to let the timer know the semantics of when it's considered expired (it does 
consider >= for instance, so just avoiding to bring those semantics into the 
HBManager). Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1596994382


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) {
  * member to {@link MemberState#JOINING}, so that it rejoins the group.
  */
 public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);
 if (pollTimer.isExpired()) {
-logger.debug("Poll timer has been reset after it had expired");
+logger.warn("Time between subsequent calls to poll() was longer 
than the configured" +
+"max.poll.interval.ms, exceeded by %s ms. This typically 
implies that the " +

Review Comment:
   yeap, my bad, I had found it too so it's fixed in a commit above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Validate at least one control record [kafka]

2024-05-10 Thread via GitHub


jsancio commented on code in PR #15912:
URL: https://github.com/apache/kafka/pull/15912#discussion_r1596977565


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -256,7 +256,7 @@ public void appendControlMessages(MemoryRecordsCreator 
valueCreator) {
 }
 
 private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) 
{
-// Confirm that it is at most one batch and it is a control record
+// Confirm that it is one control batch and it is at least one control 
record

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104915418

   @edoardocomar In general connectors do have to add a configuration like this 
eventually, because users have different tolerances for errors. Some users want 
the errors to cause the connector to become FAILED, so that they can see the 
exception in the REST API and retry it explicitly. Other users want the 
connector to retry internally infinitely, and not fail for any reason.
   
   MM2 has a _lot_ of operations that can fail, and virtually none of them 
cause the connector to fail. The reason for this is that MM2 has dedicated 
mode, where there isn't a REST API to surface errors or perform external 
retries, so external retries are very expensive. It is definitely something 
that could be fixed eventually with like a "strict mode"? configuration or 
similar. We've also considered ways to address this from the framework side, 
with retry policies and automatic restarts, but none of that has been fully 
designed or implemented yet.
   
   I think we should not block this fix on solving that more general problem. 
If there is a permissions error loading the checkpoints, MM2 should log that, 
and then degrade gracefully to the current behavior. We can have a KIP that 
adds "strict mode" make this failure surface, to make this new permission 
required.
   
   In practical terms, without a configuration and with the graceful 
degradation implementation, we can get this into 3.8.
   If you're interested in the configuration, that will delay this feature 
until 4.0. I'm fine with either, but I think the current behavior has caused 
such considerable friction in the community that we should prefer a 3.8 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16699: Have Streams treat InvalidPidMappingException like a ProducerFencedException [kafka]

2024-05-10 Thread via GitHub


wcarlson5 opened a new pull request, #15919:
URL: https://github.com/apache/kafka/pull/15919

   KStreams is able to handle the ProducerFenced (among other errors) cleanly. 
It does this by closing the task dirty and triggering a rebalance amongst the 
worker threads to rejoin the group. The producer is also recreated. Due to how 
streams works (writing to and reading from various topics), the application is 
able to figure out the last thing the fenced producer completed and continue 
from there. InvalidPidMappingException should be treated the same way.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException

2024-05-10 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson updated KAFKA-16699:
---
Description: 
KStreams is able to handle the ProducerFenced (among other errors) cleanly. It 
does this by closing the task dirty and triggering a rebalance amongst the 
worker threads to rejoin the group. The producer is also recreated. Due to how 
streams works (writing to and reading from various topics), the application is 
able to figure out the last thing the fenced producer completed and continue 
from there.

KStreams EOS V2 also trusts that any open transaction (including those whose 
producer is fenced) will be aborted by the server. This is a key factor in how 
it is able to operate. In EOS V1, the new InitProducerId fences and aborts the 
previous transaction. In either case, we are able to reason about the last 
valid state from the fenced producer and how to proceed.
h2. InvalidPidMappingException ≈ ProducerFenced

I argue that InvalidPidMappingException can be handled in the same way. Let me 
explain why.

There are two cases we see this error:


 # 
 
 {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
 # 
 
 {{if (txnMetadata.producerId != producerId)  
Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}

h3. Case 1

We are missing a value in the transactional state map for the transactional ID. 
Under normal operations, this is only possible when the transactional ID 
expires via the mechanism described above after 
{{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
state that needs to be reconciled. It is safe to just rebalance and rejoin the 
group with a new producer. We probably don’t even need to close the task dirty, 
but it doesn’t hurt to do so.
h3. Case 2

This is a bit more interesting. It says that we have transactional state, but 
the producer ID in the request does not match the producer ID associated with 
the transactional ID on the broker. How can this happen?

It is possible that a new producer instance B with the same transactional ID 
was created after the transactional state expired for instance A. Given there 
is no state on the server when B joins, it will get a totally new producer ID. 
If the original producer A comes back, it will have state for this 
transactional ID but the wrong producer ID.

In this case, the old producer ID is fenced, it’s just the normal epoch-based 
fencing logic doesn’t apply. We can treat it the same however.
h2. Summary

As described in the cases above, any time we encounter the InvalidPidMapping 
during normal operation, the previous producer was either finished with its 
operations or was fenced. Thus, it is safe to close the dirty and rebalance + 
rejoin the group just as we do with the ProducerFenced exception.

> Have Streams treat InvalidPidMappingException like a ProducerFencedException
> 
>
> Key: KAFKA-16699
> URL: https://issues.apache.org/jira/browse/KAFKA-16699
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Walker Carlson
>Priority: Major
>
> KStreams is able to handle the ProducerFenced (among other errors) cleanly. 
> It does this by closing the task dirty and triggering a rebalance amongst the 
> worker threads to rejoin the group. The producer is also recreated. Due to 
> how streams works (writing to and reading from various topics), the 
> application is able to figure out the last thing the fenced producer 
> completed and continue from there.
> KStreams EOS V2 also trusts that any open transaction (including those whose 
> producer is fenced) will be aborted by the server. This is a key factor in 
> how it is able to operate. In EOS V1, the new InitProducerId fences and 
> aborts the previous transaction. In either case, we are able to reason about 
> the last valid state from the fenced producer and how to proceed.
> h2. InvalidPidMappingException ≈ ProducerFenced
> I argue that InvalidPidMappingException can be handled in the same way. Let 
> me explain why.
> There are two cases we see this error:
>  # 
>  
>  {{txnManager.getTransactionState(transactionalId).flatMap {  case None => 
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
>  # 
>  
>  {{if (txnMetadata.producerId != producerId)  
> Left(Errors.INVALID_PRODUCER_ID_MAPPING)}}
> h3. Case 1
> We are missing a value in the transactional state map for the transactional 
> ID. Under normal operations, this is only possible when the transactional ID 
> expires via the mechanism described above after 
> {{transactional.id.expiration.ms}} of inactivity. In this case, there is no 
> state that needs to be reconciled. It is safe to just rebalance and rejoin 
> the group with a new producer. We 

[jira] [Created] (KAFKA-16699) Have Streams treat InvalidPidMappingException like a ProducerFencedException

2024-05-10 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-16699:
--

 Summary: Have Streams treat InvalidPidMappingException like a 
ProducerFencedException
 Key: KAFKA-16699
 URL: https://issues.apache.org/jira/browse/KAFKA-16699
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Walker Carlson






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596969230


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture future)
 }
 
 // visible for testing
-synchronized RequestFuture sendHeartbeatRequest() {
+public synchronized RequestFuture sendHeartbeatRequest() {

Review Comment:
   This change is no longer valid. I was trying out some things to make the 
`WorkerCoordinator` test work and this came through because of that. Will 
remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: fix LogValidatorTest#checkNonCompressed [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on PR #15904:
URL: https://github.com/apache/kafka/pull/15904#issuecomment-2104903142

   > Have the 30 test failures been triaged?
   
   |test|jira|
   |    |   |
   |testSyncTopicConfigs|https://issues.apache.org/jira/browse/KAFKA-15945|
   
|testReplicateSourceDefault|https://issues.apache.org/jira/browse/KAFKA-15292|
   
|testProduceConsumeWithWildcardAcls|https://issues.apache.org/jira/browse/KAFKA-16697|
   |testFenceMultipleBrokers|https://issues.apache.org/jira/browse/KAFKA-16634|
   |testSeparateOffsetsTopic|https://issues.apache.org/jira/browse/KAFKA-14089|
   
|testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl|https://issues.apache.org/jira/browse/KAFKA-8250|
   
|testDescribeQuorumReplicationSuccessful|https://issues.apache.org/jira/browse/KAFKA-15104|
   
|testDescribeQuorumStatusSuccessful|https://issues.apache.org/jira/browse/KAFKA-16174|
   
|testTaskRequestWithOldStartMsGetsUpdated|https://issues.apache.org/jira/browse/KAFKA-16136|
   
|testConsumptionWithBrokerFailures|https://issues.apache.org/jira/browse/KAFKA-15146|
   |testCoordinatorFailover|https://issues.apache.org/jira/browse/KAFKA-16024|
   
|testBrokerHeartbeatDuringMigration|https://issues.apache.org/jira/browse/KAFKA-15963|
   
|testAbortTransactionTimeout|https://issues.apache.org/jira/browse/KAFKA-15772|
   
|testMultiConsumerStickyAssignor|https://issues.apache.org/jira/browse/KAFKA-15934|
   
|testDynamicIpConnectionRateQuota|https://issues.apache.org/jira/browse/KAFKA-16698|
   
|testCreateClusterAndPerformReassignment|https://issues.apache.org/jira/browse/KAFKA-15103|
   
|shouldBootstrapTwoBrokersWithFollowerThrottle|https://issues.apache.org/jira/browse/KAFKA-4184|
   
   Besides, the thread leaks should be fixed by #15886. Hence, I will rebase 
code to trigger QA again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16698) Fix flaky kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota

2024-05-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16698:
--

 Summary: Fix flaky 
kafka.network.DynamicConnectionQuotaTest#testDynamicIpConnectionRateQuota
 Key: KAFKA-16698
 URL: https://issues.apache.org/jira/browse/KAFKA-16698
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


{code:java}
org.opentest4j.AssertionFailedError: Timed out waiting for connection rate 
update to propagate  at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)   at 
app//org.junit.jupiter.api.Assertions.fail(Assertions.java:138)  at 
app//kafka.network.DynamicConnectionQuotaTest.updateIpConnectionRate(DynamicConnectionQuotaTest.scala:279)
   at 
app//kafka.network.DynamicConnectionQuotaTest.testDynamicIpConnectionRateQuota(DynamicConnectionQuotaTest.scala:255)
 at 
java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
 at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
  at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
  at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
   at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-10 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845421#comment-17845421
 ] 

Philip Nee commented on KAFKA-16687:


"Looks like it's caused by this" - mind elaborate?

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KafkaDockerWrapper: correct KAFKA_HEAP_OPTS server property [kafka]

2024-05-10 Thread via GitHub


omkreddy merged PR #15345:
URL: https://github.com/apache/kafka/pull/15345


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596961414


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
 );
 }
 
+@Test
+public void testPollTimeoutExpiry() throws Exception {
+// This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+// task#stop method which is blocked. The timeouts have been set 
accordingly
+workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+connect = connectBuilder
+.numBrokers(1)
+.numWorkers(1)
+.build();
+
+connect.start();
+
+connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
+
+Map connectorWithBlockingTaskStopConfig = new 
HashMap<>();
+connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
 Objects.requireNonNull(TASK_STOP));
+
+connect.configureConnector(CONNECTOR_NAME, 
connectorWithBlockingTaskStopConfig);
+
+connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+);
+
+try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+connect.restartTask(CONNECTOR_NAME, 0);
+TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&

Review Comment:
   Could this assertion be added to an existing BlockingConnectorTest? The 
blocking plugins are inherently slow to use so we should avoid adding more 
instances of them.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture future)
 }
 
 // visible for testing
-synchronized RequestFuture sendHeartbeatRequest() {
+public synchronized RequestFuture sendHeartbeatRequest() {

Review Comment:
   I would respect the "visible for testing" comment above, and leave this 
package-local.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +268,20 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {
+Stage currentStage = listener.onPollTimeoutExpiry();
+log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+"in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +

Review Comment:
   From a user perspective, the class and method names are irrelevant, and 
bringing up irrelevant details in diagnostics can be misleading. "The last 
thing the worker was doing was: {} and may contribute to this timeout" is much 
more understandable and still gets the point across.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection 
connectors, Collection connectors, 
Collection tasks);
+
+
+/**
+ * Invoked when a worker experiences a poll timeout expiry. Invoking this 
method allows getting
+ * the stage which was currently being executed when the poll timeout 
happened. The default implementation
+ * returns null
+ * @return The current stage being executed. Could be null
+ */
+default Stage onPollTimeoutExpiry() {

Review Comment:
   This is an internal interface, unless this default method actually makes 
sense on it's own I wouldn't add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Validate at least one control record [kafka]

2024-05-10 Thread via GitHub


junrao commented on code in PR #15912:
URL: https://github.com/apache/kafka/pull/15912#discussion_r1596962906


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -256,7 +256,7 @@ public void appendControlMessages(MemoryRecordsCreator 
valueCreator) {
 }
 
 private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) 
{
-// Confirm that it is at most one batch and it is a control record
+// Confirm that it is one control batch and it is at least one control 
record

Review Comment:
   validateMemoryRecordAndReturnCount => validateMemoryRecordsAndReturnCount
   memoryRecord => memoryRecords
   Also, there is an existing typo creatte on line 268.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16697) Fix flaky kafka.api.SaslGssapiSslEndToEndAuthorizationTest#testProduceConsumeWithWildcardAcls

2024-05-10 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16697:
--

 Summary: Fix flaky 
kafka.api.SaslGssapiSslEndToEndAuthorizationTest#testProduceConsumeWithWildcardAcls
 Key: KAFKA-16697
 URL: https://issues.apache.org/jira/browse/KAFKA-16697
 Project: Kafka
  Issue Type: Test
Reporter: Chia-Ping Tsai


{code:java}
org.opentest4j.AssertionFailedError: Should have been zero expired connections 
killed: 1(total=0.0) ==> expected: <0> but was: <1>  at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)  at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166)  at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:664)  at 
app//kafka.api.EndToEndAuthorizationTest.$anonfun$confirmReauthenticationMetrics$1(EndToEndAuthorizationTest.scala:202)
  at 
app//kafka.api.EndToEndAuthorizationTest.$anonfun$confirmReauthenticationMetrics$1$adapted(EndToEndAuthorizationTest.scala:200)
  at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)  
  at app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574) 
  at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933)   
at 
app//kafka.api.EndToEndAuthorizationTest.confirmReauthenticationMetrics(EndToEndAuthorizationTest.scala:200)
 at 
app//kafka.api.EndToEndAuthorizationTest.testProduceConsumeWithWildcardAcls(EndToEndAuthorizationTest.scala:236)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
  at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
  at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChai
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]

2024-05-10 Thread via GitHub


johnnychhsu commented on PR #15906:
URL: https://github.com/apache/kafka/pull/15906#issuecomment-2104887677

   @C0urante thanks for the review!
   Just updated it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-10 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1596953771


##
raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java:
##
@@ -79,20 +81,79 @@ void testRemoveVoter() {
 );
 }
 
+@Test
+void testIsVoterWithDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(2, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testIsVoterWithoutDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), false);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testStandaloneAndIsOnlyVoter() {
+Map aVoterMap = 
voterMap(Arrays.asList(1), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isOnlyVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isOnlyVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isOnlyVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testNotStandaloneAndIsOnlyVoter() {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16526; Quorum state data version 1 [kafka]

2024-05-10 Thread via GitHub


jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1596953465


##
raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java:
##
@@ -73,23 +88,35 @@ private QuorumState buildQuorumState(Set voters) {
 );
 }
 
-@Test
-public void shouldRecordVoterQuorumState() {
-QuorumState state = buildQuorumState(Utils.mkSet(localId, 1, 2));
+@ParameterizedTest
+@ValueSource(shorts = {0, 1})
+public void shouldRecordVoterQuorumState(short kraftVersion) {
+boolean withDirectoryId = kraftVersion > 0;
+Map voterMap = 
VoterSetTest.voterMap(Utils.mkSet(1, 2), withDirectoryId);
+voterMap.put(localId, VoterSetTest.voterNode(ReplicaKey.of(localId, 
Optional.of(localDirectoryId;

Review Comment:
   Yes. Fixed. Excuse the minor errors.



##
raft/src/test/java/org/apache/kafka/raft/internals/VoterSetTest.java:
##
@@ -79,20 +81,79 @@ void testRemoveVoter() {
 );
 }
 
+@Test
+void testIsVoterWithDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), true);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(aVoterMap.get(1).voterKey()));
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(2, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(
+voterSet.isVoter(ReplicaKey.of(4, 
aVoterMap.get(1).voterKey().directoryId()))
+);
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testIsVoterWithoutDirectoryId() {
+Map aVoterMap = voterMap(Arrays.asList(1, 
2, 3), false);
+VoterSet voterSet = new VoterSet(new HashMap<>(aVoterMap));
+
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, Optional.empty(;
+assertTrue(voterSet.isVoter(ReplicaKey.of(1, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, 
Optional.of(Uuid.randomUuid();
+assertFalse(voterSet.isVoter(ReplicaKey.of(4, Optional.empty(;
+}
+
+@Test
+void testStandaloneAndIsOnlyVoter() {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596952044


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {

Review Comment:
   Yeah the tick thread stage is a best effort and can be misleading. However, 
if something has blocked long enough to cause a poll timeout, its likely enough 
to continue for the additional time it requires for the heartbeat thread to 
notice.
   
   I think the ideal use-case I see is that this error pops up in a worker log 
3 or more times before an operator has a chance to remediate it, and if the 
majority of logs blame connector-xyz, the operator can STOP that connector.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]

2024-05-10 Thread via GitHub


johnnychhsu commented on code in PR #15906:
URL: https://github.com/apache/kafka/pull/15906#discussion_r1596951379


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -353,6 +353,9 @@ private  void 
awaitTaskConfigurations(MirrorMaker mm,
 .map(TaskInfo::config)
 .allMatch(predicate);
 } catch (Exception ex) {
+if (ex instanceof RebalanceNeededException) {
+throw ex;
+}

Review Comment:
   you are right, let me add the comment, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16684: fix flaky DedicatedMirrorIntegrationTest [kafka]

2024-05-10 Thread via GitHub


johnnychhsu commented on code in PR #15906:
URL: https://github.com/apache/kafka/pull/15906#discussion_r1596951056


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -260,9 +261,8 @@ public void testMultiNodeCluster() throws Exception {
 awaitConnectorTasksStart(mirrorMakers.get("node 0"), 
MirrorHeartbeatConnector.class, sourceAndTarget);
 
 // Create one topic per Kafka cluster per MirrorMaker node
-final int topicsPerCluster = numNodes;

Review Comment:
   thanks for the review!
   totally agree, let me move it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-05-10 Thread via GitHub


mumrah opened a new pull request, #15918:
URL: https://github.com/apache/kafka/pull/15918

   When becoming the active KRaftMigrationDriver, there is another race 
condition similar to KAFKA-16171. This time, the race is due to a stale read 
from ZK. After writing to `/controller` and `/controller_epoch`, it is possible 
that a read on `/migration` is not linear with the writes that were just made. 
In other words, we get a stale read on `/migration`. This leads to an inability 
to sync metadata to ZK due to incorrect zkVersion on the migration Znode. 
   
   The non-linearizability of reads is in fact documented behavior for ZK, so 
we need to handle it.
   
   To fix the stale read, this patch adds a write to `/migration` after 
updating `/controller` and `/controller_epoch`. This allows us to learn the 
correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER 
state.
   
   This patch also adds a check on the current leader epoch when running 
certain events in KRaftMigrationDriver. Historically, we did not include this 
check because it is not necessary for correctness. Writes to ZK are gated on 
the  `/controller_epoch` zkVersion, and RPCs sent to brokers are gated on the 
controller epoch. However, during a time of rapid failover, there is a lot of 
processing happening on the controller (i.e., full metadata sync to ZK and full 
UMRs sent to brokers), so it is best to avoid running events we know will fail.
   
   There is also a small fix in here to improve the logging of ZK operations. 
The log message are changed to past tense to reflect the fact that they have 
already happened by the time the log message is created.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Validate at least one control record [kafka]

2024-05-10 Thread via GitHub


jsancio commented on PR #15912:
URL: https://github.com/apache/kafka/pull/15912#issuecomment-2104840600

   @junrao @chia7712 thanks for the reviews. PR is ready for another round.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR; Validate at least one control record [kafka]

2024-05-10 Thread via GitHub


jsancio commented on code in PR #15912:
URL: https://github.com/apache/kafka/pull/15912#discussion_r1596923121


##
raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java:
##
@@ -284,6 +284,8 @@ private int 
validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) {
 );
 } else if (numberOfRecords == null) {
 throw new IllegalArgumentException("valueCreator didn't create a 
batch with the count");
+} else if (numberOfRecords < 1) {

Review Comment:
   Added the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]

2024-05-10 Thread via GitHub


edoardocomar commented on PR #15910:
URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104835965

   Hi @gharris1727 we're now handling errors in loading the Checkpoints topic.
   (we still have to add unit tests)
   
   Specifically we tested with the not authorized to read case - which the 
existing KafkaBasedLog was not handling well.
   At this current stage the task start would fail, which to us seems an 
improvement as it is detectable and actionable (expecting the change to be 
noted in the release notes).
   
   This looks to us a better behavior than reverting to the old one in case of 
failure, as maintaining and testing two modes of operation seems too complex.
   
   Do you still think we need a KIP - to introduce yet another config to choose 
between the old behavior (default) and the new one (arguably better in the eyes 
of this PR authors ...) ?
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


AndrewJSchofield commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1596861973


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) {
  * member to {@link MemberState#JOINING}, so that it rejoins the group.
  */
 public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);
 if (pollTimer.isExpired()) {
-logger.debug("Poll timer has been reset after it had expired");
+logger.warn("Time between subsequent calls to poll() was longer 
than the configured" +
+"max.poll.interval.ms, exceeded by %s ms. This typically 
implies that the " +

Review Comment:
   `{}`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -255,11 +257,15 @@ public long maximumTimeToWait(long currentTimeMs) {
  * member to {@link MemberState#JOINING}, so that it rejoins the group.
  */
 public void resetPollTimer(final long pollMs) {
+pollTimer.update(pollMs);
 if (pollTimer.isExpired()) {

Review Comment:
   I would rather have a method added to `Timer` such as `long hasExpiredBy()` 
so the check for expiration and the calculation of by how much is encapsulated 
in the timer itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16687) Native memory leak by Unsafe_allocatememory in Kafka Clients 3.7.0

2024-05-10 Thread FTR (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845402#comment-17845402
 ] 

FTR commented on KAFKA-16687:
-

Looks like it's caused by this. I found some Sensor classes in heap memory 
which I dumped. 
Turns good now. 
I moved the kafkaConsumer instance out from a class which I used it to 
encapsulate, and then this issue was resolved. No native memory leak now!

> Native memory leak by Unsafe_allocatememory  in Kafka Clients  3.7.0
> 
>
> Key: KAFKA-16687
> URL: https://issues.apache.org/jira/browse/KAFKA-16687
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: FTR
>Assignee: Philip Nee
>Priority: Major
>
> I am building a Java Project which using Maven dependency Kafka-clients with 
> 3.7.0 version.
> My Java application logic is to use Kafka Consumer to poll Kakfa broker topic 
>  continuously. 
> I have configured my Java application with JVM options with -Xms8G -Xmx8G  
> -XX:MaxMetaspaceSize=4G, and then run it. 
> Also, there are 16G physical memory on my virtual machine. 
> After my Java application running a long time, I have found that resident 
> memory of the Java Process was being grown to more than 14G.
> In the end, the Java process ate Swap space. 
> I checked it with jmap -heap pid, and found heap memory usage is Ok. 
> Also with Native Memory Tracking [jcmd pid Native.memory detail.diff], I 
> found that it's caused by [NMT Internal] memory,  which created by 
> Unsafe_allocatememory xxx.
> In my Java application, I don't use any NIO DirectByteBuffer to allocate 
> memory.
> And I check it the Kafka-clients source code, it have codes with use 
> "sun.misc.unsafe" to allocate memory.  And MaxMetaspaceSize not work for it . 
>  
> Could you help to check it? How could I to stop this growing native memory to 
> avoid my System hang?  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16688) SystemTimer leaks resources on close

2024-05-10 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez resolved KAFKA-16688.
-
Resolution: Fixed

> SystemTimer leaks resources on close
> 
>
> Key: KAFKA-16688
> URL: https://issues.apache.org/jira/browse/KAFKA-16688
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.8.0
>Reporter: Gaurav Narula
>Assignee: Gaurav Narula
>Priority: Major
>
> We observe some thread leaks with thread name {{executor-client-metrics}}.
> This may happen because {{SystemTimer}} doesn't attempt to shutdown its 
> executor service properly.
> Refer: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15885/1/tests
>  and tests with {{initializationError}} in them for stacktrace



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on code in PR #15917:
URL: https://github.com/apache/kafka/pull/15917#discussion_r1596894830


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
+
+class RemoteLogMetadataManagerTestUtils {

Review Comment:
   > Will the TopicBasedRemoteLogMetadataManagerHarness class be removed?
   
   Yep, The new test way is only applied to one test class in this PR. I plan 
to rewrite all tests of storage after this PR gets approved. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on PR #15911:
URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104790646

   > Can you actually not remove this because I was starting to use it in the 
GetOffsetShellToolTest? The problem I faced with using the topic-based RLMM is 
that it requires the bootstrap-servers to initialise correctly. However, said 
bootstrap-servers are not present at initialisation time (due to determining 
the port dynamically)
   
   @clolov  Could you take a look at #15917 ?  I try to use another way to 
write tests for storage module. `ClusterInstace` get created and port is bound 
in testing phase. Maybe that can fix problem you described.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]

2024-05-10 Thread via GitHub


kamalcph commented on code in PR #15917:
URL: https://github.com/apache/kafka/pull/15917#discussion_r1596873472


##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
+
+class RemoteLogMetadataManagerTestUtils {

Review Comment:
   Will the `TopicBasedRemoteLogMetadataManagerHarness` class be removed?



##
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java:
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
+import static 

Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-10 Thread via GitHub


kamalcph commented on PR #15911:
URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104768094

   > Can you actually not remove this because I was starting to use it in the 
GetOffsetShellToolTest? The problem I faced with using the topic-based RLMM is 
that it requires the bootstrap-servers to initialise correctly. However, said 
bootstrap-servers are not present at initialisation time (due to determining 
the port dynamically)
   
   We can get the port from the Kafkabroker using the `boundPort` method. If it 
does not work for you, Could you please open a draft PR? I'll take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-10 Thread via GitHub


clolov commented on PR #15911:
URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104745407

   Can you actually not remove this because I was starting to use it in the 
GetOffsetShellTool? The problem I faced with using the topic-based RLMM is that 
it requires the bootstrap-servers to start correctly. However, said 
bootstrap-servers are not present at initialisation time (due to determining 
the port dynamically)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-10 Thread Johnson Okorie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845380#comment-17845380
 ] 

Johnson Okorie edited comment on KAFKA-16692 at 5/10/24 2:44 PM:
-

Thanks [~jolshan], looking forward to see your findings!


was (Author: JIRAUSER305348):
Thanks [~jolshan], looking forward to see you findings!

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance 

[jira] [Commented] (KAFKA-16692) InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka 3.5 to 3.6

2024-05-10 Thread Johnson Okorie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845380#comment-17845380
 ] 

Johnson Okorie commented on KAFKA-16692:


Thanks [~jolshan], looking forward to see you findings!

> InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled when upgrading from kafka 3.5 to 3.6 
> 
>
> Key: KAFKA-16692
> URL: https://issues.apache.org/jira/browse/KAFKA-16692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.1
>Reporter: Johnson Okorie
>Assignee: Justine Olshan
>Priority: Major
>
> We have a kafka cluster running on version 3.5.2 that we are upgrading to 
> 3.6.1. This cluster has a lot of clients with exactly one semantics enabled 
> and hence creating transactions. As we replaced brokers with the new 
> binaries, we observed lots of clients in the cluster experiencing the 
> following error:
> {code:java}
> 2024-05-07T09:08:10.039Z "tid": "" -- [Producer clientId=, 
> transactionalId=] Got error produce response with 
> correlation id 6402937 on topic-partition , retrying 
> (2147483512 attempts left). Error: NETWORK_EXCEPTION. Error Message: The 
> server disconnected before a response was received.{code}
> On inspecting the broker, we saw the following errors on brokers still 
> running Kafka version 3.5.2:
>  
> {code:java}
> message:     
> Closing socket for  because of error
> exception_exception_class:    
> org.apache.kafka.common.errors.InvalidRequestException
> exception_exception_message:    
> Received request api key ADD_PARTITIONS_TO_TXN with version 4 which is not 
> enabled
> exception_stacktrace:    
> org.apache.kafka.common.errors.InvalidRequestException: Received request api 
> key ADD_PARTITIONS_TO_TXN with version 4 which is not enabled
> {code}
> On the new brokers running 3.6.1 we saw the following errors:
>  
> {code:java}
> [AddPartitionsToTxnSenderThread-1055]: AddPartitionsToTxnRequest failed for 
> node 1043 with a network exception.{code}
>  
> I can also see this :
> {code:java}
> [AddPartitionsToTxnManager broker=1055]Cancelled in-flight 
> ADD_PARTITIONS_TO_TXN request with correlation id 21120 due to node 1043 
> being disconnected (elapsed time since creation: 11ms, elapsed time since 
> send: 4ms, request timeout: 3ms){code}
> We started investigating this issue and digging through the changes in 3.6, 
> we came across some changes introduced as part of KAFKA-14402 that we thought 
> might lead to this behaviour. 
> First we could see that _transaction.partition.verification.enable_ is 
> enabled by default and enables a new code path that culminates in we sending 
> version 4 ADD_PARTITIONS_TO_TXN requests to other brokers that are generated 
> [here|https://github.com/apache/kafka/blob/29f3260a9c07e654a28620aeb93a778622a5233d/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L269].
> From a 
> [discussion|https://lists.apache.org/thread/4895wrd1z92kjb708zck4s1f62xq6r8x] 
> on the mailing list, [~jolshan] pointed out that this scenario shouldn't be 
> possible as the following code paths should prevent version 4 
> ADD_PARTITIONS_TO_TXN requests being sent to other brokers:
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java#L130]
>  
> [https://github.com/apache/kafka/blob/525b9b1d7682ae2a527ceca83fedca44b1cba11a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala#L195]
> However, these requests are still sent to other brokers in our environment.
> On further inspection of the code, I am wondering if the following code path 
> could lead to this issue:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L500]
> In this scenario, we don't have any _NodeApiVersions_ available for the 
> specified nodeId and potentially skipping the _latestUsableVersion_ check. I 
> am wondering if it is possible that because _discoverBrokerVersions_ is set 
> to _false_ for the network client of the {_}AddPartitionsToTxnManager{_}, it 
> skips fetching {_}NodeApiVersions{_}? I can see that we create the network 
> client here:
> [https://github.com/apache/kafka/blob/c4deed513057c94eb502e64490d6bdc23551d8b6/core/src/main/scala/kafka/server/KafkaServer.scala#L641]
> The _NetworkUtils.buildNetworkClient_ method seems to create a network client 
> that has _discoverBrokerVersions_ set to {_}false{_}. 
> I was hoping I could get some assistance debugging this issue. Happy to 
> provide any additional information needed.
>  
>  
>  



--
This message was sent by Atlassian Jira

Re: [PR] KAFKA-16677: Replace ClusterType#ALL and ClusterType#DEFAULT by Array [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15897:
URL: https://github.com/apache/kafka/pull/15897#discussion_r1596833496


##
core/src/test/java/kafka/test/annotation/ClusterTest.java:
##
@@ -33,7 +33,7 @@
 @Retention(RUNTIME)
 @TestTemplate
 public @interface ClusterTest {
-Type clusterType() default Type.DEFAULT;
+Type[] clusterTypes() default {};

Review Comment:
   `ClusterTestDefaults` makes sense to me. Honestly I wasn't thinking of it 
when suggesting having a default with all the types, simply because I didn't 
remember we had it. My point was more about having a default with all types, 
agree with you that the right place for that default should be 
ClusterTestDefaults



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]

2024-05-10 Thread via GitHub


KevinZTW commented on PR #15884:
URL: https://github.com/apache/kafka/pull/15884#issuecomment-2104711204

   > @KevinZTW Please fix the build error
   
   sorry I just fixed it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16264: Expose `producer.id.expiration.check.interval.ms` as dynamic broker configuration [kafka]

2024-05-10 Thread via GitHub


jeqo commented on PR #15888:
URL: https://github.com/apache/kafka/pull/15888#issuecomment-2104695522

   Thanks, @jolshan ! I missed that one, and helped me to find a couple of 
missing steps. PTAL and let's confirm if this requires a small KIP to merge. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on PR #15909:
URL: https://github.com/apache/kafka/pull/15909#issuecomment-2104687487

   Done, so I simplified what we log when the background thread realizes time's 
up and leaves the group to rejoin eventually (that's all the relevant info at 
that point). I then moved the log that details the expired max.poll.interval to 
the place where we can give a more accurate exceeded time, which is on the next 
app poll event that the background handles. Also updated the test to make sure 
it checks not only how the exceed time is calculated, but also **where** it is 
calculated. Makes sense? More accurate now indeed, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16445) PATCH method for connector configuration

2024-05-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16445:
--
Fix Version/s: 3.8.0

> PATCH method for connector configuration
> 
>
> Key: KAFKA-16445
> URL: https://issues.apache.org/jira/browse/KAFKA-16445
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Minor
> Fix For: 3.8.0
>
>
> As  [KIP-477: Add PATCH method for connector config in Connect REST 
> API|https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API]
>  suggests, we should introduce the PATCH method for connector configuration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1596716965


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -193,11 +193,12 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 pollTimer.update(currentTimeMs);
 if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
-logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+logger.warn("Consumer poll timeout has expired, exceeded by {} ms. 
This means the time between " +

Review Comment:
   Hey, good point, it would actually take this a step further, where indeed 
should be more useful. As @AndrewJSchofield pointed, the HB manager will notice 
sooner in practice (even sooner than the HB interval), but we do know when the 
next poll happens, so can definitely get a more accurate exceed time 
(in-between calls to poll, which translates to poll events handled in this same 
manager). On it...thanks for the comments!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16695: Improve expired poll logging [kafka]

2024-05-10 Thread via GitHub


lianetm commented on code in PR #15909:
URL: https://github.com/apache/kafka/pull/15909#discussion_r1596716965


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -193,11 +193,12 @@ public NetworkClientDelegate.PollResult poll(long 
currentTimeMs) {
 }
 pollTimer.update(currentTimeMs);
 if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
-logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+logger.warn("Consumer poll timeout has expired, exceeded by {} ms. 
This means the time between " +

Review Comment:
   Hey, good point, it would actually take this a step further, where indeed 
should be more useful. As @AndrewJSchofield pointed, the HB manager will notice 
sooner in practice (even sooner than the HB interval), but we do know when the 
next poll happens, so can definitely get a more accurate exceed time 
(in-between calls to poll, which translate to poll events handled in this same 
manager). On it...thanks for the comments!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-10 Thread via GitHub


jeqo commented on PR #15893:
URL: https://github.com/apache/kafka/pull/15893#issuecomment-2104551065

   @C0urante thanks! Agree, we should leave this change of behavior out of the 
scope of this PR/KIP. I have returned to the previous behavior, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13473) Log cleaner Dynamic configs aren't applied after a restart

2024-05-10 Thread Jeel Jotaniya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeel Jotaniya reassigned KAFKA-13473:
-

Assignee: Jeel Jotaniya

> Log cleaner Dynamic configs aren't applied after a restart
> --
>
> Key: KAFKA-13473
> URL: https://issues.apache.org/jira/browse/KAFKA-13473
> Project: Kafka
>  Issue Type: Bug
>  Components: config, core
>Affects Versions: 2.8.1
>Reporter: Tim Patterson
>Assignee: Jeel Jotaniya
>Priority: Minor
>
> Upon restarting kafka, dynamically configured log cleaner configs aren't 
> picked up and applied.
>  
> Here are some logs from a local kafka when I up the threads to 2 using the 
> kafka-config tool - Noting the last 2 lines where it starts up 2 log cleaner 
> threads.
>  
> {code:java}
> [2021-11-23 21:09:50,044] INFO [Admin Manager on Broker 1001]: Updating 
> brokers with new configuration : log.cleaner.threads -> 2 
> (kafka.server.ZkAdminManager)
> [2021-11-23 21:09:50,092] INFO Processing override for entityPath: 
> brokers/ with config: HashMap(log.cleaner.threads -> 2) 
> (kafka.server.DynamicConfigManager) log.cleaner.threads = 2
> [2021-11-23 21:09:50,113] INFO Shutting down the log cleaner. 
> (kafka.log.LogCleaner)
> [2021-11-23 21:09:50,114] INFO [kafka-log-cleaner-thread-0]: Shutting down 
> (kafka.log.LogCleaner)
> [2021-11-23 21:09:50,116] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> [2021-11-23 21:09:50,116] INFO [kafka-log-cleaner-thread-0]: Shutdown 
> completed (kafka.log.LogCleaner)
> [2021-11-23 21:09:50,119] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2021-11-23 21:09:50,178] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2021-11-23 21:09:50,181] INFO [kafka-log-cleaner-thread-1]: Starting 
> (kafka.log.LogCleaner){code}
> And now after a restart, at no point does it ever start 2 threads, even 
> though it clearly knows about the configs
>  
> {code:java}
> [2021-11-23 21:10:46,659] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2021-11-23 21:10:46,723] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2021-11-23 21:10:48,124] INFO Processing override for entityPath: 
> brokers/ with config: HashMap(log.cleaner.threads -> 2) 
> (kafka.server.DynamicConfigManager) log.cleaner.backoff.ms = 15000 
> log.cleaner.dedupe.buffer.size = 15000 log.cleaner.delete.retention.ms = 
> 8640 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 
> log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 
> 1.7976931348623157E308 log.cleaner.max.compaction.lag.ms = 
> 9223372036854775807 log.cleaner.min.cleanable.ratio = 0.5 
> log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 2{code}
>  
> When investigating from the kafka config tool all looks well.
>  
> {code:java}
> kafka-configs --bootstrap-server $BROKER_URL --entity-type brokers 
> --entity-default --describe --all | grep log.cleaner.threads 
> log.cleaner.threads=2 sensitive=false 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:log.cleaner.threads=2}{code}
>  
> But if you try change the config you soon find out that all is not well(note 
> here it mentions the current value is 1 in the validation message)
>  
> {code:java}
> kafka-configs --bootstrap-server $BROKER_URL --entity-type brokers 
> --entity-default --alter --add-config log.cleaner.threads=3 Error while 
> executing config command with args '--bootstrap-server profile_kafka:9093 
> --entity-type brokers --entity-default --alter --add-config 
> log.cleaner.threads=3' java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.InvalidRequestException: Invalid config value 
> for resource ConfigResource(type=BROKER, name=''): Invalid value 
> org.apache.kafka.common.config.ConfigException: Log cleaner threads cannot be 
> increased to more than double the current value 1 for configuration Invalid 
> dynamic configuration{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on PR #15884:
URL: https://github.com/apache/kafka/pull/15884#issuecomment-2104544789

   @KevinZTW Please fix the build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: rewrite TopicBasedRemoteLogMetadataManagerTest by ClusterTestE… [kafka]

2024-05-10 Thread via GitHub


chia7712 opened a new pull request, #15917:
URL: https://github.com/apache/kafka/pull/15917

   This is the first PR used to migrate tests of storage to new test infra. 
With the new test infra, we can make those tests run on either zk or kraft 
cluster easily. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16654:Refactor kafka.test.annotation.Type and ClusterTestExtensions [kafka]

2024-05-10 Thread via GitHub


TaiJuWu opened a new pull request, #15916:
URL: https://github.com/apache/kafka/pull/15916

   *More detailed description of your change,
   As title.
   
   *Summary of testing strategy (including rationale)
   local test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596621705


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java:
##
@@ -76,7 +76,7 @@ public void assertAtLeastNumWorkersAreUp(int numWorkers, 
String detailMessage) t
 }
 
 /**
- * Assert that at least the requested number of workers are up and running.
+ * Assert that the exact number of workers are up and running.

Review Comment:
   I tokk the liberty and edited this comment in this PR. Hope that's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16679 merge unit test down to the class of integration test [kafka]

2024-05-10 Thread via GitHub


KevinZTW commented on code in PR #15884:
URL: https://github.com/apache/kafka/pull/15884#discussion_r1596572793


##
tools/src/test/java/org/apache/kafka/tools/FeatureCommandTest.java:
##
@@ -47,22 +47,16 @@
 @ClusterTestDefaults(clusterType = Type.KRAFT)

Review Comment:
   oh you are right, I didn't notice that part. Just remove it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-10 Thread via GitHub


nikramakrishnan commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2104320732

   Given that we are marking the latest version as unstable in 
ListOffsetsResponse, it would make sense to not bump the IBP now as @clolov 
suggests, and only bump it when ListOffsetsResponse v9 is ready to be marked as 
stable. Doing this allows us to test v9 with `unstable.api.versions.enable`, 
and brokers and clients on IV0 will continue to work with v8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16688: Use helper method to shutdown ExecutorService [kafka]

2024-05-10 Thread via GitHub


soarez merged PR #15886:
URL: https://github.com/apache/kafka/pull/15886


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16480: Bump ListOffsets version, IBP version and mark last version of ListOffsets as unstable [kafka]

2024-05-10 Thread via GitHub


clolov commented on PR #15673:
URL: https://github.com/apache/kafka/pull/15673#issuecomment-2104306940

   Heya @junrao after inspecting some of the tests I think I have misunderstood 
how marking an API version as unstable works. From a discussion with Nikhil I 
think we can only do one of two things - we can either keep the IBP and mark 
the new version as unstable or we can bump the IBP. Otherwise what happens is 
that the broker treats the version as non-existent while clients don't respect 
the configuration and still send the new version. Is this your understanding as 
well? If it is, then I believe I will just mark the version as unstable while I 
make the client changes and then I will mark it as stable and bump the IBP all 
in one commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on code in PR #15911:
URL: https://github.com/apache/kafka/pull/15911#discussion_r1596516913


##
storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java:
##
@@ -49,9 +45,10 @@ public class RemoteLogMetadataManagerTest {
 
 private final Time time = new MockTime(1);
 
-@ParameterizedTest(name = "remoteLogMetadataManager = {0}")
-@MethodSource("remoteLogMetadataManagers")
-public void testFetchSegments(RemoteLogMetadataManager 
remoteLogMetadataManager) throws Exception {
+private RemoteLogMetadataManager remoteLogMetadataManager = new 
TopicBasedRemoteLogMetadataManagerWrapperWithHarness();
+
+@Test
+public void testFetchSegments() throws Exception {
 try {

Review Comment:
   This can be addressed as follow-up I think :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16696: Removed the in-memory implementation of RSM and RLMM [kafka]

2024-05-10 Thread via GitHub


chia7712 commented on PR #15911:
URL: https://github.com/apache/kafka/pull/15911#issuecomment-2104271236

   QA is re-triggered, and I will merge it if QA does not show the related 
error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496660


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##
@@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() {
 
 final ConfigException se = assertThrows(ConfigException.class, () -> 
new TopologyTestDriver(topology));
 final String msg = se.getMessage();
-assertTrue("Error about class cast with serdes", 
msg.contains("StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));

Review Comment:
   recovered test



##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.StateSerdes;
+
+
+public class StoreSerdeInitializer {
+static  StateSerdes prepareStoreSerde(final StateStoreContext 
context, final String storeName,
+  final String 
changelogTopic, final Serde keySerde,

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496472


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+} catch (final ConfigException e) {
+throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));

Review Comment:
   added original message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]

2024-05-10 Thread via GitHub


AyoubOm commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1596492735


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode 
child) {
 public void init(final InternalProcessorContext context) {
 super.init(context);
 this.context = context;
-keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+try {
+keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());

Review Comment:
   Nice catch ! Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Increasing timeout for wait_for_loggers to 30s [kafka]

2024-05-10 Thread via GitHub


vamossagar12 opened a new pull request, #15915:
URL: https://github.com/apache/kafka/pull/15915

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on PR #15305:
URL: https://github.com/apache/kafka/pull/15305#issuecomment-2104218899

   Thanks @gharris1727 , I have made the changes as you suggested. I also 
modified the warning line that is printed based on the tick thread stage that 
is presented to the coordinator and also added a test to verify the same. 
LMKWYT.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16197: Print Connect worker specific logs on poll timeout expiry [kafka]

2024-05-10 Thread via GitHub


vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596476664


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##
@@ -267,6 +267,18 @@ public String memberId() {
 return JoinGroupRequest.UNKNOWN_MEMBER_ID;
 }
 
+@Override
+protected void handlePollTimeoutExpiry() {

Review Comment:
   Thanks Greg, I think that makes sense. I have extended the 
`WorkerRebalanceListener` to add another hook upon poll timeout expiry which 
the coordinator invokes. This way, it is able to access the tick thread's stage 
which was being executed at that point of time. I am just thinking, could there 
be race conditions where what we get in the tick thread stage might not always 
reflect the point where tick thread is blocked (like it moved on by the time we 
invoke this or becomes null), but I guess it should be ok because this can be 
treated as best effort? WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Upgrade scala-logging to 3.9.5 [kafka]

2024-05-10 Thread via GitHub


viktorsomogyi commented on PR #15914:
URL: https://github.com/apache/kafka/pull/15914#issuecomment-2104207428

   @ijuma would you please review this small PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >