Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520862105


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -69,6 +97,15 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 createOldMessageFormatBrokers()
 produceMessagesInOneBatch()
 verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")

Review Comment:
   This is to test the logic in `RecordsInfo#info`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520861572


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -60,6 +63,31 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {
 produceMessagesInOneBatch("gzip")
 verifyListOffsets()
+
+// test LogAppendTime case
+val props: Properties = new Properties()
+props.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, 
"LogAppendTime")
+createTopicWithConfig(topicNameWithCustomConfigs, props)
+produceMessagesInOneBatch("gzip", topicNameWithCustomConfigs)
+// In LogAppendTime's case, the maxTimestampOffset should be the first 
message of the batch.
+// So in this one batch test, it'll be the first offset 0
+verifyListOffsets(topic = topicNameWithCustomConfigs, 0)

Review Comment:
   Added test cases for `LogAppendTime` scenarios. This is test the logic in 
`validateMessagesAndAssignOffsetsCompressed`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520858564


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before() {
 cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
 
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
 
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+}
 
+public void setUp() {

Review Comment:
   The `setUp` is necessary because when in `before` (i.e. `beforeEach`), the 
kafka cluster is still not created yet. That's why we can inject custom broker 
properties there. And in `setUp`, we can create producer/admin to talk to the 
brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520857036


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   > This has the same issue. The semantic for MAX_TIMESTAMP is the first 
offset with the max timestamp. So, if timestamp is 
TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of 
lastOffset.
   
   Yes, updated.
   
   > Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?
   I've added a comment in another PR: 
https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do 
all the renaming there.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   > This has the same issue. The semantic for MAX_TIMESTAMP is the first 
offset with the max timestamp. So, if timestamp is 
TimestampType.LOG_APPEND_TIME, we need to use the baseOffset, instead of 
lastOffset.
   
   Yes, updated.
   
   > Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?
   
   I've added a comment in another PR: 
https://github.com/apache/kafka/pull/15476#issuecomment-1990459827 . We can do 
all the renaming there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-11 Thread via GitHub


showuon commented on PR #15476:
URL: https://github.com/apache/kafka/pull/15476#issuecomment-1990459827

   Please also address this comment in this PR: 
https://github.com/apache/kafka/pull/15474#discussion_r1520262997 . Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup BoundedList to Make Constructors More Safe [kafka]

2024-03-11 Thread via GitHub


ChrisAHolland commented on PR #15507:
URL: https://github.com/apache/kafka/pull/15507#issuecomment-1990325002

   @chia7712 @cadonna Hi, saw you approved some recent PR's, wondering if you 
could take a look at mine or suggest someone better suited that I can ask? 
Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520774849


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -931,17 +920,14 @@ public void testGroupEpochBumpWhenNewStaticMemberJoins() {
 
 ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
 .setMemberEpoch(11)
+.setState(MemberState.UNRELEASED_PARTITIONS)
 .setInstanceId(memberId3)
 .setPreviousMemberEpoch(0)
-.setTargetMemberEpoch(11)
 .setClientId("client")
 .setClientHost("localhost/127.0.0.1")
 .setRebalanceTimeoutMs(5000)
 .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
 .setServerAssignorName("range")
-.setPartitionsPendingAssignment(mkAssignment(

Review Comment:
   do we have any way to track the partitions still pending? Or is it just a 
matter of the difference between the owned partition and the assignment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825496#comment-17825496
 ] 

Matthias J. Sax commented on KAFKA-16359:
-

Just to clarify: we cannot re-publish a 3.7.0 artifact. – We can only fix this 
with 3.7.1 release. Seems we should push out 3.7.1 rather sooner than later.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520772129


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -338,7 +338,6 @@ public void testConsumerGroupMemberEpochValidation() {
 ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
 .setMemberEpoch(100)
 .setPreviousMemberEpoch(99)
-.setTargetMemberEpoch(100)

Review Comment:
   should we explicitly set state here? I think it is stable by default but the 
other builders added it. Ditto for a few more in this file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520772129


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -338,7 +338,6 @@ public void testConsumerGroupMemberEpochValidation() {
 ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
 .setMemberEpoch(100)
 .setPreviousMemberEpoch(99)
-.setTargetMemberEpoch(100)

Review Comment:
   should we explicitly set state here? I think it is stable by default but the 
other builders added it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16357.
-
Resolution: Duplicate

> Kafka Client JAR manifest breaks javac linting
> --
>
> Key: KAFKA-16357
> URL: https://issues.apache.org/jira/browse/KAFKA-16357
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
> Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
>Reporter: Jacek Wojciechowski
>Priority: Critical
>
> I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project 
> is not building anymore.
> The reason is that kafka-clients-3.7.0.jar contains the following entry in 
> its JAR manifest file:
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
>  .5.jar slf4j-api-1.7.36.jar
> I'm using Maven repo to keep my dependencies and those files are not in the 
> same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
> Class-Path are not correct. It fails my build because we build with javac 
> with all linting options on, in particular -Xlint:-path. It produces the 
> following warnings coming from javac:
> [WARNING] COMPILATION WARNING : 
> [INFO] -
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> Since we have also {{-Werror}} option enabled, it turns warnings into errors 
> and fails our build.
> I think our setup is quite typical: using Maven repo to store dependencies, 
> having linting on and -Werror. Unfortunatelly, it doesn't work with the 
> lastest kafka-clients because of the entries in the manifest's Class-Path. 
> And I think it might affect quite a lot of projects set up in a similar way.
> I don't know what was the reason to add Class-Path entry in the JAR manifest 
> file - but perhaps this effect was not considered.
> It would be great if you removed the Class-Path entry from the JAR manifest 
> file.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16357:

Priority: Critical  (was: Major)

> Kafka Client JAR manifest breaks javac linting
> --
>
> Key: KAFKA-16357
> URL: https://issues.apache.org/jira/browse/KAFKA-16357
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
> Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
>Reporter: Jacek Wojciechowski
>Priority: Critical
>
> I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project 
> is not building anymore.
> The reason is that kafka-clients-3.7.0.jar contains the following entry in 
> its JAR manifest file:
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
>  .5.jar slf4j-api-1.7.36.jar
> I'm using Maven repo to keep my dependencies and those files are not in the 
> same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
> Class-Path are not correct. It fails my build because we build with javac 
> with all linting options on, in particular -Xlint:-path. It produces the 
> following warnings coming from javac:
> [WARNING] COMPILATION WARNING : 
> [INFO] -
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> "/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> Since we have also {{-Werror}} option enabled, it turns warnings into errors 
> and fails our build.
> I think our setup is quite typical: using Maven repo to store dependencies, 
> having linting on and -Werror. Unfortunatelly, it doesn't work with the 
> lastest kafka-clients because of the entries in the manifest's Class-Path. 
> And I think it might affect quite a lot of projects set up in a similar way.
> I don't know what was the reason to add Class-Path entry in the JAR manifest 
> file - but perhaps this effect was not considered.
> It would be great if you removed the Class-Path entry from the JAR manifest 
> file.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15519:
URL: https://github.com/apache/kafka/pull/15519#discussion_r1520683129


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
+// InstanceId - send when leaving the group as a static member
 membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+if (membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
 data.setInstanceId(groupInstanceId);
 sentFields.instanceId = groupInstanceId;

Review Comment:
   It is never read after this PR, so I feel it is safe to remove it. 
Otherwise, that will result in another warning about "never read".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-16360.
-
Resolution: Invalid

Please don't use Jira to ask questions. Jira tickets are for bug reports and 
features only.

Question should be asked on the user and/or dev mailing lists: 
https://kafka.apache.org/contact

> Release plan of 3.x kafka releases.
> ---
>
> Key: KAFKA-16360
> URL: https://issues.apache.org/jira/browse/KAFKA-16360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Major
>
> KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
>  mentions ,
> h2. Kafka 3.7
>  * January 2024
>  * Final release with ZK mode
> But we see in Jira, some tickets are marked for 3.8 release. Does apache 
> continue to make 3.x releases having zookeeper and kraft supported 
> independent of pure kraft 4.x releases ?
> If yes, how many more releases can be expected on 3.x release line ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520671475


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamWindowCloseTest.java:
##
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.test.MockApiProcessor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+
+public class KStreamKStreamWindowCloseTest {
+
+private static final String LEFT = "left";
+private static final String RIGHT = "right";
+private final static Properties PROPS = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
+private static final Consumed CONSUMED = 
Consumed.with(Serdes.Integer(), Serdes.String());
+private static final JoinWindows WINDOW = 
JoinWindows.ofTimeDifferenceAndGrace(ofMillis(10), ofMillis(5));
+
+static List streams() {
+return Arrays.asList(
+innerJoin(),
+leftJoin(),
+outerJoin()
+);
+}
+
+private static Arguments innerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).join(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments leftJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).leftJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
processorSupplier = new MockApiProcessorSupplier<>();
+stream.process(processorSupplier);
+return Arguments.of(builder.build(PROPS), processorSupplier);
+}
+
+private static Arguments outerJoin() {
+final StreamsBuilder builder = new StreamsBuilder();
+final KStream stream = builder.stream(LEFT, 
CONSUMED).outerJoin(
+builder.stream(RIGHT, CONSUMED),
+MockValueJoiner.TOSTRING_JOINER,
+WINDOW,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+);
+final MockApiProcessorSupplier 
supplier = new MockApiProcessorSupplier<>();
+stream.process(supplier);
+return Arguments.of(builder.build(PROPS), supplier);
+}
+
+@ParameterizedTest
+@MethodSource("streams")
+public void recordsArrivingPostWindowCloseShouldBeDropped(
+final Topology topology,
+final MockApiProcessorSupplier supplier) {
+  

Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520627804


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -25,36 +25,46 @@
 
 public final class StreamStreamJoinUtil {
 
-private StreamStreamJoinUtil(){
+private StreamStreamJoinUtil() {
 }
 
 public static  boolean skipRecord(
 final Record record, final Logger logger,
 final Sensor droppedRecordsSensor,
-final ProcessorContext context) {
+final ProcessorContext context
+) {
 // we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
 //
 // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
 // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+logSkip("null key or value", logger, droppedRecordsSensor, 
context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void logSkip(
+final String reason,
+final Logger logger,
+final Sensor droppedRecordsSensor,
+final ProcessorContext context
+) {
+if (context.recordMetadata().isPresent()) {
+final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+logger.warn(
+"Skipping record. reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review Comment:
   ```suggestion
   "Skipping record. Reason=[{}] topic=[{}] partition=[{}] 
offset=[{}]",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15189:
URL: https://github.com/apache/kafka/pull/15189#discussion_r1520626385


##
streams/src/test/java/org/apache/kafka/streams/integration/RelaxedNullKeyRequirementJoinTest.java:
##
@@ -71,11 +71,21 @@ void afterEach() {
 @Test
 void testRelaxedLeftStreamStreamJoin() {
 leftStream
-.leftJoin(rightStream, JOINER, WINDOW)
+.leftJoin(rightStream, JOINER, WINDOW_60MS_GRACE_10MS)
 .to(OUT);
 initTopology();
-left.pipeInput(null, "leftValue", 1);
-assertEquals(Collections.singletonList(new KeyValue<>(null, 
"leftValue|null")), out.readKeyValuesToList());
+left.pipeInput(null, "leftValue1", 1);
+left.pipeInput(null, "leftValue2", 90);
+left.pipeInput(null, "lateArrival-Dropped", 19);
+left.pipeInput(null, "lateArrivalWithinGrace", 20);
+assertEquals(
+Arrays.asList(
+new KeyValue<>(null, "leftValue1|null"),
+new KeyValue<>(null, "leftValue2|null"),
+new KeyValue<>(null, "lateArrivalWithinGrace|null")

Review Comment:
   Oh. This is `null`-key case. My bad. For `null`-key we know it won't join in 
the future, so no reason to artificially delay the output. Thanks for pointing 
it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove deprecation and exception throw in ProcessorRecordContext#hashcode [kafka]

2024-03-11 Thread via GitHub


mjsax commented on PR #15508:
URL: https://github.com/apache/kafka/pull/15508#issuecomment-1989699015

   What Bruno says. Git history if you friend :)
   
   https://github.com/apache/kafka/pull/6602/files#r278810522
   
   I think we should keep the code as-is and close this PR w/o merging (or add 
some more detailed comment to explain why the code is this way better).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 Sloppy bug: moved the outerJounBreak-flags out of the loop [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15510:
URL: https://github.com/apache/kafka/pull/15510#discussion_r1520612406


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -223,9 +223,9 @@ private void emitNonJoinedOuterRecords(
 try (final KeyValueIterator, 
LeftOrRightValue> it = store.all()) {
 TimestampedKeyAndJoinSide prevKey = null;
 
+boolean outerJoinLeftBreak = false;
+boolean outerJoinRightBreak = false;
 while (it.hasNext()) {
-boolean outerJoinLeftBreak = false;
-boolean outerJoinRightBreak = false;

Review Comment:
   Ups... Wondering why we did not catch this in the unit tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16366) Refactor KTable source optimization

2024-03-11 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-16366:
---

 Summary: Refactor KTable source optimization
 Key: KAFKA-16366
 URL: https://issues.apache.org/jira/browse/KAFKA-16366
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


Kafka Streams DSL offers an optimization to re-use an input topic as table 
changelog, in favor of creating a dedicated changelog topic.

So far, the Processor API did not support any such feature, and thus when the 
DSL compiles down into a Topology, we needed to access topology internal stuff 
to allow for this optimization.

With KIP-813 (merged for AK 3.8), we added `Topology#addReadOnlyStateStore` as 
public API, and thus we should refactor the DSL compilation code, to use this 
public API to build the `Topology` instead of internal APIs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15519:
URL: https://github.com/apache/kafka/pull/15519#discussion_r1520582633


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
+// InstanceId - send when leaving the group as a static member
 membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+if (membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
 data.setInstanceId(groupInstanceId);
 sentFields.instanceId = groupInstanceId;

Review Comment:
   I don't understand enough about this part of the code to judge...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520575467


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {

Review Comment:
   > We can give the test a more descriptive name though, right?
   
   sure. sorry for the lazy naming :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15519:
URL: https://github.com/apache/kafka/pull/15519#discussion_r1520572340


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -529,10 +529,9 @@ public ConsumerGroupHeartbeatRequestData 
buildRequestData() {
 // MemberEpoch - always sent
 data.setMemberEpoch(membershipManager.memberEpoch());
 
-// InstanceId - only sent if has changed since the last heartbeat
-// Always send when leaving the group as a static member
+// InstanceId - send when leaving the group as a static member
 membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
-if (!groupInstanceId.equals(sentFields.instanceId) || 
membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
+if (membershipManager.memberEpoch() == 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
 data.setInstanceId(groupInstanceId);
 sentFields.instanceId = groupInstanceId;

Review Comment:
   it seems `sentFields.instanceId` is useless after this PR. maybe we should 
remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520542402


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {

Review Comment:
   We can give the test a more descriptive name though, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16365) AssignmentsManager mismanages completion notifications

2024-03-11 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16365:
---

 Summary: AssignmentsManager mismanages completion notifications
 Key: KAFKA-16365
 URL: https://issues.apache.org/jira/browse/KAFKA-16365
 Project: Kafka
  Issue Type: Sub-task
Reporter: Igor Soarez
Assignee: Igor Soarez


When moving replicas between directories in the same broker, future replica 
promotion hinges on acknowledgment from the controller of a change in the 
directory assignment.
 
ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion 
notification of the directory assignment change.
 
In its current form, under certain assignment scheduling, AssignmentsManager 
both miss completion notifications, or prematurely trigger them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1520520996


##
core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala:
##
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import kafka.api.IntegrationTestHarness
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.{Collections, Properties}
+
+class OffsetOfMaxTimestampTest extends IntegrationTestHarness {
+  @AfterEach
+  override def tearDown(): Unit = {
+TestUtils.shutdownServers(brokers)
+super.tearDown()
+  }
+
+  override def brokerCount: Int = 1
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWithNoCompression(quorum: String): Unit = {
+test(false)
+  }
+
+  private def test(useCompression: Boolean): Unit = {
+val topicName: String = "OffsetOfMaxTimestampTest-" + 
System.currentTimeMillis()
+
+val admin: Admin = Admin.create(adminClientConfig)
+try {
+  admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 
1.toShort)
+.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, 
if (useCompression) "gzip" else "none"
+  val props: Properties = new Properties
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) 
"gzip" else "none")
+  val producer: KafkaProducer[String, String] = new KafkaProducer[String, 
String](props)
+  try {
+val time: Long = 1
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
100, null, "val20"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
400, null, "val15"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
250, null, "val15"))

Review Comment:
   @jolshan I prefer to have following test: 
https://github.com/apache/kafka/pull/15474#discussion_r1520206161



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


junrao commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520506664


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +53,24 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before() {
 cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
 
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
 
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+}
 
+public void setUp() {

Review Comment:
   Why do we need to split the logic between `before` and `setUp`?



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  def produceMessagesInOneBatch(compressionType: String = "none"): Unit = {

Review Comment:
   Could this method be private? Ditto for `produceMessagesInSeparateBatch`.



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   This has the same issue. The semantic for MAX_TIMESTAMP is the first offset 
with the max timestamp. So, if timestamp is TimestampType.LOG_APPEND_TIME, we 
need to use the baseOffset, instead of lastOffset.
   
   Also, could we remove the shallow part in 
RecordsInfo.shallowOffsetOfMaxTimestamp?



##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -82,19 +122,63 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 ).asJava, new ListOffsetsOptions()).all().get().get(tp)
   }
 
-  def produceMessages(): Unit = {
+  def produceMessagesInOneBatch(compressionType: String = "none"): Unit = {
 val records = Seq(
   new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 100L,
-null, new Array[Byte](1)),
+null, new Array[Byte](10)),
   new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 999L,
-null, new Array[Byte](1)),
+null, new Array[Byte](10)),
   new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
-null, new Array[Byte](1)),
+null, new Array[Byte](10)),
 )
-TestUtils.produceMessages(brokers, records, -1)
+// create a producer with large linger.ms and enough batch.size (default 
is enough for three 10 bytes records),
+// so that we can confirm all records will be accumulated in producer 
until we flush them into one batch.
+val producer = createProducer(
+  plaintextBootstrapServers(brokers),
+  deliveryTimeoutMs = Int.MaxValue,
+  lingerMs = Int.MaxValue,
+  compressionType = compressionType)
+
+try {
+  val futures = records.map(producer.send)
+  producer.flush()
+  futures.foreach(_.get)
+} finally {
+  producer.close()
+}
   }
 
-  def generateConfigs: Seq[KafkaConfig] =
-TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).map(KafkaConfig.fromProps)
+  def produceMessagesInSeparateBatch(compressionType: String = "none"): Unit = 
{
+val records = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 100L,
+null, new Array[Byte](10)))
+val records2 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 999L,
+  null, new Array[Byte](10)))
+val records3 = Seq(new ProducerRecord[Array[Byte], Array[Byte]](topicName, 
0, 200L,
+  null, new Array[Byte](10)))
+
+val producer = createProducer(
+  

Re: [PR] KAFKA-14683 Cleanup WorkerSinkTaskTest [kafka]

2024-03-11 Thread via GitHub


hgeraldino commented on PR #15506:
URL: https://github.com/apache/kafka/pull/15506#issuecomment-1989568163

   Thanks @chia7712 for your review
   
   @gharris1727 does this looks good to you? It's the final PR to consider the 
`WorkerSinkTaskTest` migration done done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16223 Replace EasyMock/PowerMock with Mockito for KafkaConfigBackingStoreTest (1/3) [kafka]

2024-03-11 Thread via GitHub


hgeraldino opened a new pull request, #15520:
URL: https://github.com/apache/kafka/pull/15520

   This is the last remaining Kafka Connect test that needs migration from 
PowerMock/EasyMock to Mockito.
   
   Following the same approach as the `WorkerSinkTaskTest` migration, which was 
migrated in [1](https://github.com/apache/kafka/pull/14663) 
[2](https://github.com/apache/kafka/pull/15313) 
[3](https://github.com/apache/kafka/pull/15316) separate batches, this PR 
contains just ~ 1/3 of the total number of test methods, which should make the 
review "easier".
   
   As usual, I Iook forward for your comments and feedback @C0urante 
@gharris1727 @divijvaidya @clolov @mukkachaitanya 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+continue;
+}
+
+  

[jira] [Commented] (KAFKA-16291) Mirrormaker2 wrong checkpoints

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825467#comment-17825467
 ] 

Greg Harris commented on KAFKA-16291:
-

[~claudio.benfatto] That's a good idea, I agree that the default behavior isn't 
good enough in all scenarios and a configuration is needed.

Since it includes a user configuration, and needs significant design work, this 
will need a KIP. I've opened KAFKA-16364 to track the work there, and you're 
welcome to assign yourself and draft a KIP.

But just to temper your expectations here:

> Offset translation guarantees zero-redelivery

This is not possible given the asynchronous pattern used for offset 
translation. I think this can be true in an eventual-consistency sense: If the 
upstream consumer group is inactive for sufficiently long enough (and lag < N), 
then translation could be exact.

We can also use this as an opportunity to design an alternative to 
offset.lag.max=0 and the mirror source sync send semaphore[!] because even with 
a 100% retention solution on the MirrorCheckpointTask side, the 
MirrorSourceTask still drops syncs occasionally.

> Mirrormaker2 wrong checkpoints
> --
>
> Key: KAFKA-16291
> URL: https://issues.apache.org/jira/browse/KAFKA-16291
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.6.1
> Environment: Mirrormaker2 version 3.6.1 running on docker containers
>Reporter: Claudio Benfatto
>Priority: Major
>
> I am running Mirrormaker2 with the following configuration:
> {noformat}
> clusters = fallingwaterfall, weatheredbase
> sync.group.offsets.interval.seconds=30
> emit.checkpoints.interval.seconds=30
> offset.lag.max=0
> fallingwaterfall->weatheredbase.enabled = true
> weatheredbase->fallingwaterfall.enabled = false
> sync.group.offsets.enabled=true
> emit.heartbeats.enabled=true
> emit.checkpoints.enabled=true
> emit.checkpoints.interval.seconds=30
> refresh.groups.enabled=true
> refresh.groups.interval.seconds=30
> refresh.topics.enabled=true
> sync.topic.configs.enabled=true
> refresh.topics.interval.seconds=30
> sync.topic.acls.enabled = false
> fallingwaterfall->weatheredbase.topics = storage-demo-.*
> fallingwaterfall->weatheredbase.groups = storage-demo-.*
> group.id=mirror-maker-fallingwaterfall-weatheredbase
> consumer.group.id=mirror-maker-fallingwaterfall-weatheredbase
> fallingwaterfall.consumer.isolation.level = read_committed
> weatheredbase.producer.enable.idempotence = true
> weatheredbase.producer.acks=all
> weatheredbase.exactly.once.source.support = enabled
> replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
> {noformat}
> I am experiencing issues with the consumer group offset synchronisation.
> I have a setup with a 12-partition topic, named *storage-demo-test,* a single 
> transactional producer to this topic and a consumer group, named 
> *storage-demo-test-cg,* consuming from it.
> The consumer configuration is:
> {code:java}
> 'auto.offset.reset': 'earliest',
> 'isolation.level': 'read_committed',
> 'enable.auto.commit': False, {code}
> and I'm committing the offsets explicitly and synchronously after each poll.
> What I observed is that the synchronised offsets between the upstream and 
> downstream cluster for the *storage-demo-test-cg* are often wrong.
> For example in the case of this checkpoint:
> {code:java}
> (1, 1708505669764) - 6252 - 
> CheckpointKey(consumer_group='storage-demo-test-cg', 
> topic='storage-demo-test', partition=5) - 
> CheckpointValue(upstream_offset=197532, downstream_offset=196300) {code}
> We have a mismatch in the replicated messages:
> {code:java}
> [fallingwaterfall]# kcat -C -b0 -t storage-demo-test -p 5 -o 197532 -c 1
> Test message 1027-0 {code}
> {code:java}
> [weatheredbase]# kcat -C -b0 -t storage-demo-test -p 5 -o 196300 -c 1
> Test message 1015-9 {code}
> In the Mirrormaker2 logs I see many of these messages:
> {code:java}
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:18,534] TRACE [MirrorCheckpointConnector|task-0] 
> latestDownstreamOffset 196300 is larger than or equal to 
> convertedUpstreamOffset 196300 for TopicPartition storage-demo-test-5 
> (org.apache.kafka.connect.mirror.MirrorCheckpointTask:337)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] DEBUG [MirrorCheckpointConnector|task-0] 
> translateDownstream(storage-demo-test-cg,storage-demo-test-5,197532): 
> Translated 195684 (relative to OffsetSync{topicPartition=storage-demo-test-5, 
> upstreamOffset=196913, downstreamOffset=195683}) 
> (org.apache.kafka.connect.mirror.OffsetSyncStore:160)
> mirrormaker2-fallingwaterfall-weatheredbase-1 - mirrormaker2-server - 
> [2024-02-21 09:02:01,557] TRACE [MirrorCheckpointConnector|task-0] 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520498085


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Because we provide the topic list and the list only contains the topic we 
have not received the result, we can skip setting the partition 0 topics in the 
cursor. Such topics will be the first one in the Topics field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+continue;
+}
+
+  

[jira] [Created] (KAFKA-16364) MM2 High-Resolution Offset Translation

2024-03-11 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16364:
---

 Summary: MM2 High-Resolution Offset Translation
 Key: KAFKA-16364
 URL: https://issues.apache.org/jira/browse/KAFKA-16364
 Project: Kafka
  Issue Type: New Feature
  Components: mirrormaker
Reporter: Greg Harris


The current OffsetSyncStore implementation 
[https://github.com/apache/kafka/blob/8b72a2c72f09838fdd2e7416c98d30fe876b4078/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L57]
 stores a sparse index of offset syncs. This attempts to strike a balanced 
default behavior between offset translation availability, memory usage, and 
throughput on the offset syncs topic.

However, this balanced default behavior is not good enough in all 
circumstances. When precise offset translation is needed away from the end of 
the topic, such as for consumer groups with persistent lag, offset translation 
can be more precise. Users should have a way to configure high-precision offset 
translation, either through additional memory usage or re-reading the offset 
syncs topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16152: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart [kafka]

2024-03-11 Thread via GitHub


mjsax commented on code in PR #15419:
URL: https://github.com/apache/kafka/pull/15419#discussion_r1520482326


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -546,8 +546,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() 
{
 data.setMemberEpoch(membershipManager.memberEpoch());
 
 // InstanceId - only sent if has changed since the last heartbeat

Review Comment:
   Did a follow up PR: https://github.com/apache/kafka/pull/15519



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: simplify consumer logic [kafka]

2024-03-11 Thread via GitHub


mjsax commented on PR #15519:
URL: https://github.com/apache/kafka/pull/15519#issuecomment-1989525897

   Follow up from https://github.com/apache/kafka/pull/15419/files#r1513841644


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: simplify consumer logic [kafka]

2024-03-11 Thread via GitHub


mjsax opened a new pull request, #15519:
URL: https://github.com/apache/kafka/pull/15519

   For static member, the `group.instance.id` cannot change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520443872


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+continue;
+}
+
+

[jira] [Comment Edited] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825454#comment-17825454
 ] 

Greg Harris edited comment on KAFKA-15841 at 3/11/24 9:46 PM:
--

[~henriquemota] Okay I think i understand better what you're trying to achieve.

> ... one topic per table...
> We have a JDBC Sink for each table.

Okay, you're using scenario (1), one connector per-topic, which should come to 
at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too 
many to fit on a single machine, and certainly needs a cluster to distribute 
the work.

In this scenario, Connect should be able to distribute approximately 9000/M 
connectors and 9000/M tasks to each of the M workers in a distributed cluster, 
barring any other practical limits/timeouts that i'm not aware of, so check for 
ERROR messages.

> We tried to change the 'topics' property in the configurations using the 
> 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property 
> when it is returned by 'taskConfigs(int maxTasks)'.

The reason it does this is because the `topics` property is passed to the 
consumers to have them subscribe to the input topics, and the Consumer/Connect 
processing model has this subscription be the same for all consumers.
This doesn't mean that every consumer is consuming every topic, however. Having 
a uniform subscription across all of the consumers in a group tells the 
consumers to assign the work among themselves, assigning the topic-partitions 
to each of the consumers according to the configured assignor.

As an example, say your connector config had `topics=a,b`, and these two topics 
had 2 partitions, and tasks.max=2.

The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 
partitions could be distributed like this by the consumer partition assignor:

task-0: a-0, b-0

task-1: a-1, b-1

Or any permutation. This is where the assignor I mentioned is important; The 
RangeAssignor can generate some pretty unbalanced assignments: 
[https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html]

If you choose a different assignor (RoundRobin, Sticky, etc), then you can 
switch to scenario (2), with one connector per client, and some tasks.max 
around 10. This would give you ~90 connectors with 900 tasks, each working on 
10 topics.

You can tune tasks.max up and down if you need more throughput or want less 
consumer/task overhead.


was (Author: gharris1727):
[~henriquemota] Okay I think i understand better what you're trying to achieve.

> ... one topic per table...
> We have a JDBC Sink for each table.

Okay, you're using scenario (1), one connector per-topic, which should come to 
at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too 
many to fit on a single machine, and certainly needs a cluster to distribute 
the work.

In this scenario, Connect should be able to distribute approximately 9000/M 
connectors and 9000/M tasks to each of the M workers in a distributed cluster, 
barring any other practical limits/timeouts that i'm not aware of, so check for 
ERROR messages.

> We tried to change the 'topics' property in the configurations using the 
> 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property 
> when it is returned by 'taskConfigs(int maxTasks)'.

The reason it does this is because the `topics` property is passed to the 
consumers to have them subscribe to the input topics, and the Consumer/Connect 
processing model has this subscription be the same for all consumers.
This doesn't mean that every consumer is consuming every topic, however. Having 
a uniform subscription across all of the consumers in a group tells the 
consumers to assign the work among themselves, assigning the topic-partitions 
to each of the consumers according to the configured assignor.

As an example, say your connector config had `topics=a,b`, and these two topics 
had 2 partitions, and tasks.max=2.

The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 
partitions could be distributed like this by the consumer partition assignor:

task-0: a-0, b-0

task-1: a-1, b-1

Or any permutation. This is where the partitioner I mentioned is important; The 
RangeAssignor can generate some pretty unbalanced assignments: 
[https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html]

If you choose a different assignor (RoundRobin, Sticky, etc), then you can 
switch to scenario (2), with one connector per client, and some tasks.max 
around 10. This would give you ~90 connectors with 900 tasks, each working on 
10 topics.

Tou can tune tasks.max up and down if you need more throughput or want less 
consumer/task overhead.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
>   

[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825454#comment-17825454
 ] 

Greg Harris commented on KAFKA-15841:
-

[~henriquemota] Okay I think i understand better what you're trying to achieve.

> ... one topic per table...
> We have a JDBC Sink for each table.

Okay, you're using scenario (1), one connector per-topic, which should come to 
at most 90 * 100 = 9000 connectors per Connect cluster. That is certainly too 
many to fit on a single machine, and certainly needs a cluster to distribute 
the work.

In this scenario, Connect should be able to distribute approximately 9000/M 
connectors and 9000/M tasks to each of the M workers in a distributed cluster, 
barring any other practical limits/timeouts that i'm not aware of, so check for 
ERROR messages.

> We tried to change the 'topics' property in the configurations using the 
> 'taskConfigs(int maxTasks)' method, but Kafka Connect ignores this property 
> when it is returned by 'taskConfigs(int maxTasks)'.

The reason it does this is because the `topics` property is passed to the 
consumers to have them subscribe to the input topics, and the Consumer/Connect 
processing model has this subscription be the same for all consumers.
This doesn't mean that every consumer is consuming every topic, however. Having 
a uniform subscription across all of the consumers in a group tells the 
consumers to assign the work among themselves, assigning the topic-partitions 
to each of the consumers according to the configured assignor.

As an example, say your connector config had `topics=a,b`, and these two topics 
had 2 partitions, and tasks.max=2.

The `topics` configs for both task-0 and task-1 would both be `a,b`, but the 4 
partitions could be distributed like this by the consumer partition assignor:

task-0: a-0, b-0

task-1: a-1, b-1

Or any permutation. This is where the partitioner I mentioned is important; The 
RangeAssignor can generate some pretty unbalanced assignments: 
[https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html]

If you choose a different assignor (RoundRobin, Sticky, etc), then you can 
switch to scenario (2), with one connector per client, and some tasks.max 
around 10. This would give you ~90 connectors with 900 tasks, each working on 
10 topics.

Tou can tune tasks.max up and down if you need more throughput or want less 
consumer/task overhead.

> Add Support for Topic-Level Partitioning in Kafka Connect
> -
>
> Key: KAFKA-15841
> URL: https://issues.apache.org/jira/browse/KAFKA-15841
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect
>Reporter: Henrique Mota
>Priority: Trivial
> Attachments: image-2024-02-19-13-48-55-875.png
>
>
> In our organization, we utilize JDBC sink connectors to consume data from 
> various topics, where each topic is dedicated to a specific tenant with a 
> single partition. Recently, we developed a custom sink based on the standard 
> JDBC sink, enabling us to pause consumption of a topic when encountering 
> problematic records.
> However, we face limitations within Kafka Connect, as it doesn't allow for 
> appropriate partitioning of topics among workers. We attempted a workaround 
> by breaking down the topics list within the 'topics' parameter. 
> Unfortunately, Kafka Connect overrides this parameter after invoking the 
> {{taskConfigs(int maxTasks)}} method from the 
> {{org.apache.kafka.connect.connector.Connector}} class.
> We request the addition of support in Kafka Connect to enable the 
> partitioning of topics among workers without requiring a fork. This 
> enhancement would facilitate better load distribution and allow for more 
> flexible configurations, particularly in scenarios where topics are dedicated 
> to different tenants.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16345: Optionally URL-encode clientID and clientSecret in authorization header [kafka]

2024-03-11 Thread via GitHub


bachmanity1 commented on PR #15475:
URL: https://github.com/apache/kafka/pull/15475#issuecomment-1989425275

   @kirktrue @mimaison kind reminder. Could you please have a look at the KIP? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520418403


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/MemberState.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The various states that a member can be in. For their definition,
+ * refer to the documentation of {{@link CurrentAssignmentBuilder}}.

Review Comment:
   Is this still the case? We removed a lot of the javadoc there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520383812


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -33,49 +33,6 @@
  * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
- *
- * The member state has the following properties:
- * - Current Epoch:

Review Comment:
   Do we have a replacement summary for this java doc? I noticed there were 
more comments in the code, but not sure if they covered everything.
   
   EDIT: I see some of the descriptions in the new MemberState class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520414314


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -170,72 +127,122 @@ public CurrentAssignmentBuilder withOwnedTopicPartitions(
  * @return A new ConsumerGroupMember or the current one.
  */
 public ConsumerGroupMember build() {
-// A new target assignment has been installed, we need to restart
-// the reconciliation loop from the beginning.
-if (targetAssignmentEpoch != member.targetMemberEpoch()) {
-return transitionToNewTargetAssignmentState();
-}
-
 switch (member.state()) {
-// Check if the partitions have been revoked by the member.
-case REVOKING:
-return maybeTransitionFromRevokingToAssigningOrStable();
+case STABLE:
+// When the member is in the STABLE state, we verify if a newer
+// epoch (or target assignment) is available. If it is, we can
+// reconcile the member towards it. Otherwise, we return.
+if (member.memberEpoch() != targetAssignmentEpoch) {
+return computeNextAssignment(
+member.memberEpoch(),
+member.assignedPartitions()
+);
+} else {
+return member;
+}
 
-// Check if pending partitions have been freed up.
-case ASSIGNING:
-return maybeTransitionFromAssigningToAssigningOrStable();
+case UNREVOKED_PARTITIONS:
+// When the member is in the UNREVOKED_PARTITIONS state, we 
wait
+// until the member has revoked the necessary partitions. They 
are
+// considered revoked when they are not anymore reported in the
+// owned partitions set in the ConsumerGroupHeartbeat API.
 
-// Nothing to do.
-case STABLE:
-return member;
+// If the member does not provide its owned partitions. We 
cannot
+// progress.
+if (ownedTopicPartitions == null) {
+return member;
+}
+
+// If the member provides its owned partitions. We verify if 
it still
+// owns any of the revoked partitions. If it does, we cannot 
progress.
+for (ConsumerGroupHeartbeatRequestData.TopicPartitions 
topicPartitions : ownedTopicPartitions) {
+for (Integer partitionId : topicPartitions.partitions()) {
+boolean stillHasRevokedPartition = member
+.partitionsPendingRevocation()
+.getOrDefault(topicPartitions.topicId(), 
Collections.emptySet())
+.contains(partitionId);
+if (stillHasRevokedPartition) {
+return member;
+}
+}
+}
+
+// When the member has revoked all the pending partitions, it 
can
+// transition to the next epoch (current + 1) and we can 
reconcile
+// its state towards the latest target assignment.
+return computeNextAssignment(
+member.memberEpoch() + 1,
+member.assignedPartitions()
+);
+
+case UNRELEASED_PARTITIONS:
+// When the member is in the UNRELEASED_PARTITIONS, we 
reconcile the
+// member towards the latest target assignment. This will 
assign any
+// of the unreleased partitions when they become available.
+return computeNextAssignment(
+member.memberEpoch(),
+member.assignedPartitions()
+);
+
+case UNKNOWN:
+// We could only end up in this state if a new state is added 
in the
+// future and the group coordinator is downgraded. In this 
case, the
+// best option is to fence the member to force it to rejoin 
the group
+// without any partitions and to reconcile it again from 
scratch.
+if (ownedTopicPartitions == null || 
!ownedTopicPartitions.isEmpty()) {
+throw new FencedMemberEpochException("The consumer group 
member is in a unknown state. "
++ "The member must abandon all its partitions and 
rejoin.");
+}
+
+return computeNextAssignment(

Review Comment:
   Is the idea we only hit this case below on restart? Ie, we fence and force 
the member out of the group, but it will still be unknown on rejoining (just 
with no owned partitions)



-- 
This is an automated message 

[jira] [Commented] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825439#comment-17825439
 ] 

Greg Harris commented on KAFKA-16356:
-

[~linu] I have given you permissions to assign tickets. You can assign this 
ticket and begin working on it when you have time.

Thanks for your interest in Kafka!

> Remove class-name dispatch in RemoteLogMetadataSerde
> 
>
> Key: KAFKA-16356
> URL: https://issues.apache.org/jira/browse/KAFKA-16356
> Project: Kafka
>  Issue Type: Task
>  Components: Tiered-Storage
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Priority: Trivial
>  Labels: newbie
>
> The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
> has to dispatch to one of four serializers depending on it's type. This is 
> done by taking the class name of the RemoteLogMetadata and looking it up in 
> maps to find the corresponding serializer for that class.
> This later requires an unchecked cast, because the RemoteLogMetadataTransform 
> is generic. This is all type-unsafe, and can be replaced with type-safe 
> if-elseif-else statements that may also be faster than the double-indirect 
> map lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520383812


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java:
##
@@ -33,49 +33,6 @@
  * The CurrentAssignmentBuilder class encapsulates the reconciliation engine 
of the
  * consumer group protocol. Given the current state of a member and a desired 
or target
  * assignment state, the state machine takes the necessary steps to converge 
them.
- *
- * The member state has the following properties:
- * - Current Epoch:

Review Comment:
   Do we have a replacement summary for this java doc? I noticed there were 
more comments in the code, but not sure if they covered everything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520381484


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -528,27 +478,6 @@ public Map> 
partitionsPendingRevocation() {
 return partitionsPendingRevocation;
 }
 
-/**
- * @return The set of partitions awaiting assignment to the member.
- */
-public Map> partitionsPendingAssignment() {
-return partitionsPendingAssignment;
-}
-
-/**
- * @return A string representation of the current assignment state.
- */
-public String currentAssignmentSummary() {

Review Comment:
   Did we get rid of this because it is not used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Tweak streams config doc [kafka]

2024-03-11 Thread via GitHub


cherylws commented on PR #15518:
URL: https://github.com/apache/kafka/pull/15518#issuecomment-1989345872

   @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16295: Align RocksDB and in-memory store init() sequences [kafka]

2024-03-11 Thread via GitHub


wcarlson5 commented on code in PR #15421:
URL: https://github.com/apache/kafka/pull/15421#discussion_r1520361134


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -170,9 +170,6 @@ public void init(final ProcessorContext context,
 @Override
 public void init(final StateStoreContext context,
  final StateStore root) {
-// open the DB dir
-metricsRecorder.init(getMetricsImpl(context), context.taskId());
-openDB(context.appConfigs(), context.stateDir());
 

Review Comment:
   Can we get one more line deleted here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Minor: Tweak streams config doc [kafka]

2024-03-11 Thread via GitHub


cherylws opened a new pull request, #15518:
URL: https://github.com/apache/kafka/pull/15518

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Kafka Streams docs fixes [kafka]

2024-03-11 Thread via GitHub


mjsax opened a new pull request, #15517:
URL: https://github.com/apache/kafka/pull/15517

   - add missing section to TOC
   - add default value for client.id


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1520300581


##
core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala:
##
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import kafka.api.IntegrationTestHarness
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.{Collections, Properties}
+
+class OffsetOfMaxTimestampTest extends IntegrationTestHarness {
+  @AfterEach
+  override def tearDown(): Unit = {
+TestUtils.shutdownServers(brokers)
+super.tearDown()
+  }
+
+  override def brokerCount: Int = 1
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWithNoCompression(quorum: String): Unit = {
+test(false)
+  }
+
+  private def test(useCompression: Boolean): Unit = {
+val topicName: String = "OffsetOfMaxTimestampTest-" + 
System.currentTimeMillis()
+
+val admin: Admin = Admin.create(adminClientConfig)
+try {
+  admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 
1.toShort)
+.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, 
if (useCompression) "gzip" else "none"
+  val props: Properties = new Properties
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) 
"gzip" else "none")
+  val producer: KafkaProducer[String, String] = new KafkaProducer[String, 
String](props)
+  try {
+val time: Long = 1
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
100, null, "val20"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
400, null, "val15"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
250, null, "val15"))

Review Comment:
   For the non-compression case, we can just make the record size 10 B (rather 
than 10KB). 
   There is also some discussion about how to control the batch size via flush



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compressed type [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15476:
URL: https://github.com/apache/kafka/pull/15476#discussion_r1520299410


##
core/src/test/scala/integration/kafka/api/OffsetOfMaxTimestampTest.scala:
##
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.api
+
+import kafka.api.IntegrationTestHarness
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{Admin, NewTopic, OffsetSpec}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.common.serialization.StringSerializer
+import org.junit.jupiter.api.AfterEach
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import java.util.{Collections, Properties}
+
+class OffsetOfMaxTimestampTest extends IntegrationTestHarness {
+  @AfterEach
+  override def tearDown(): Unit = {
+TestUtils.shutdownServers(brokers)
+super.tearDown()
+  }
+
+  override def brokerCount: Int = 1
+
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWithNoCompression(quorum: String): Unit = {
+test(false)
+  }
+
+  private def test(useCompression: Boolean): Unit = {
+val topicName: String = "OffsetOfMaxTimestampTest-" + 
System.currentTimeMillis()
+
+val admin: Admin = Admin.create(adminClientConfig)
+try {
+  admin.createTopics(Collections.singletonList(new NewTopic(topicName, 1, 
1.toShort)
+.configs(Collections.singletonMap(TopicConfig.COMPRESSION_TYPE_CONFIG, 
if (useCompression) "gzip" else "none"
+  val props: Properties = new Properties
+  props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
+  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[StringSerializer])
+  props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, if (useCompression) 
"gzip" else "none")
+  val producer: KafkaProducer[String, String] = new KafkaProducer[String, 
String](props)
+  try {
+val time: Long = 1
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
100, null, "val20"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
400, null, "val15"))
+producer.send(new ProducerRecord[String, String](topicName, 0, time + 
250, null, "val15"))

Review Comment:
   Do we need a separate test for this? I mentioned how to test here: 
https://github.com/apache/kafka/pull/15474#discussion_r1516625703



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-7632: Support Compression Levels (KIP-390) [kafka]

2024-03-11 Thread via GitHub


mimaison opened a new pull request, #15516:
URL: https://github.com/apache/kafka/pull/15516

   Based on https://github.com/apache/kafka/pull/5927
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] dummy PR to check acls [kafka]

2024-03-11 Thread via GitHub


JobseRyan closed pull request #15515: dummy PR to check acls
URL: https://github.com/apache/kafka/pull/15515


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] dummy PR to check acls [kafka]

2024-03-11 Thread via GitHub


JobseRyan opened a new pull request, #15515:
URL: https://github.com/apache/kafka/pull/15515

   dummy PR
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] User/rjobse/add xinfra identifier check [kafka]

2024-03-11 Thread via GitHub


JobseRyan closed pull request #15514: User/rjobse/add xinfra identifier check
URL: https://github.com/apache/kafka/pull/15514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] User/rjobse/add xinfra identifier check [kafka]

2024-03-11 Thread via GitHub


JobseRyan opened a new pull request, #15514:
URL: https://github.com/apache/kafka/pull/15514

   Pull request to add a check for the client type and library version to check 
if the client which is accessing this broker is a xinfra client
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-11 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1520206161


##
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##
@@ -36,40 +37,79 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
 
   val topicName = "foo"
   var adminClient: Admin = _
+  var setOldMessageFormat: Boolean = false
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
 super.setUp(testInfo)
 createTopic(topicName, 1, 1.toShort)
-produceMessages()
 adminClient = Admin.create(Map[String, Object](
   AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers()
 ).asJava)
   }
 
   @AfterEach
   override def tearDown(): Unit = {
+setOldMessageFormat = false
 Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
 super.tearDown()
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
-  def testEarliestOffset(quorum: String): Unit = {
-val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
-assertEquals(0, earliestOffset.offset())
+  def testThreeCompressedRecordsInOneBatch(quorum: String): Unit = {

Review Comment:
   How about using `CsvSource` to list all cases? for example:
   ```scala
 @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
 @CsvSource(Array(
   "zk, true, true, true",
   "zk, true, true, false",
   "zk, true, false, true",
   "zk, true, false, false",
   "zk, false, true, true",
   "zk, false, true, false",
   //"zk, false, false, true", KAFKA-16341 will enable it 
   "zk, false, false, false",
   "kraft, true, false, true",
   "kraft, true, false, false",
   //"kraft, false, false, true", KAFKA-16341 will enable it 
   "kraft, false, false, false"
 ))
 def test(quorum: String, compression: Boolean, oldMessage: Boolean, 
oneBatch: Boolean): Unit = {
   if (oldMessage) createOldMessageFormatBrokers()
   if (oneBatch) produceMessagesInOneBatch(if(compression) "gzip" else 
"none")
   else produceMessagesInSeparateBatch(if(compression) "gzip" else "none")
   verifyListOffsets()
 }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-11 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1520201655


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -230,8 +234,19 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
-break;
+final long outerJoinLookBackTimeMs = 
getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
+if (sharedTimeTracker.minTime + outerJoinLookBackTimeMs + 
joinGraceMs >= sharedTimeTracker.streamTime) {
+if (timestampedKeyAndJoinSide.isLeftSide()) {
+outerJoinLeftBreak = true; // there are no more 
candidates to emit on left-outerJoin-side
+} else {
+outerJoinRightBreak = true; // there are no more 
candidates to emit on right-outerJoin-side
+}
+if (outerJoinLeftBreak && outerJoinRightBreak) {

Review Comment:
   See: https://github.com/apache/kafka/pull/15510
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520191884


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -37,4 +42,28 @@ public static OptionalInt ofSentinel(int value) {
 public static OptionalLong ofSentinel(long value) {
 return value != -1 ? OptionalLong.of(value) : OptionalLong.empty();
 }
+
+/**
+ * @return The provided assignment as a String.

Review Comment:
   Could we maybe include and example string and what its components are?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16363) Storage crashes if dir is unavailable

2024-03-11 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825402#comment-17825402
 ] 

Igor Soarez commented on KAFKA-16363:
-

cc [~pprovenzano] 

> Storage crashes if dir is unavailable
> -
>
> Key: KAFKA-16363
> URL: https://issues.apache.org/jira/browse/KAFKA-16363
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> The storage tool crashes if one of the configured log directories is 
> unavailable. 
>  
> {code:java}
> sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID 
> -c server.properties
> [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file 
> /data/d2/meta.properties 
> (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble)
> java.nio.file.AccessDeniedException: /data/d2/meta.properties
>         at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>         at 
> java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218)
>         at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
>         at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
>         at 
> java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
>         at java.base/java.nio.file.Files.newInputStream(Files.java:160)
>         at 
> org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77)
>         at 
> org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135)
>         at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431)
>         at kafka.tools.StorageTool$.main(StorageTool.scala:95)
>         at kafka.tools.StorageTool.main(StorageTool.scala)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, 
> nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: 
> MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, 
> directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR})
> I/O error trying to read log directory /data/d2.
>  {code}
> When configured with multiple directories, Kafka tolerates some of them (but 
> not all) being inaccessible, so this tool should be able to handle the same 
> scenarios without crashing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16363) Storage crashes if dir is unavailable

2024-03-11 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-16363:
---

Assignee: Igor Soarez

> Storage crashes if dir is unavailable
> -
>
> Key: KAFKA-16363
> URL: https://issues.apache.org/jira/browse/KAFKA-16363
> Project: Kafka
>  Issue Type: Sub-task
>  Components: tools
>Affects Versions: 3.7.0
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> The storage tool crashes if one of the configured log directories is 
> unavailable. 
>  
> {code:java}
> sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID 
> -c server.properties
> [2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file 
> /data/d2/meta.properties 
> (org.apache.kafka.metadata.properties.MetaPropertiesEnsemble)
> java.nio.file.AccessDeniedException: /data/d2/meta.properties
>         at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
>         at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
>         at 
> java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218)
>         at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
>         at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
>         at 
> java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
>         at java.base/java.nio.file.Files.newInputStream(Files.java:160)
>         at 
> org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77)
>         at 
> org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135)
>         at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431)
>         at kafka.tools.StorageTool$.main(StorageTool.scala:95)
>         at kafka.tools.StorageTool.main(StorageTool.scala)
> metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
> dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, 
> nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: 
> MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, 
> directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR})
> I/O error trying to read log directory /data/d2.
>  {code}
> When configured with multiple directories, Kafka tolerates some of them (but 
> not all) being inaccessible, so this tool should be able to handle the same 
> scenarios without crashing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16363) Storage crashes if dir is unavailable

2024-03-11 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-16363:
---

 Summary: Storage crashes if dir is unavailable
 Key: KAFKA-16363
 URL: https://issues.apache.org/jira/browse/KAFKA-16363
 Project: Kafka
  Issue Type: Sub-task
  Components: tools
Affects Versions: 3.7.0
Reporter: Igor Soarez


The storage tool crashes if one of the configured log directories is 
unavailable. 

 
{code:java}
sh-4.4# ./bin/kafka-storage.sh format --ignore-formatted -t $KAFKA_CLUSTER_ID 
-c server.properties
[2024-03-11 17:51:05,391] ERROR Error while reading meta.properties file 
/data/d2/meta.properties 
(org.apache.kafka.metadata.properties.MetaPropertiesEnsemble)
java.nio.file.AccessDeniedException: /data/d2/meta.properties
        at 
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:90)
        at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
        at 
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at 
java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:218)
        at java.base/java.nio.file.Files.newByteChannel(Files.java:380)
        at java.base/java.nio.file.Files.newByteChannel(Files.java:432)
        at 
java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:422)
        at java.base/java.nio.file.Files.newInputStream(Files.java:160)
        at 
org.apache.kafka.metadata.properties.PropertiesUtils.readPropertiesFile(PropertiesUtils.java:77)
        at 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble$Loader.load(MetaPropertiesEnsemble.java:135)
        at kafka.tools.StorageTool$.formatCommand(StorageTool.scala:431)
        at kafka.tools.StorageTool$.main(StorageTool.scala:95)
        at kafka.tools.StorageTool.main(StorageTool.scala)
metaPropertiesEnsemble=MetaPropertiesEnsemble(metadataLogDir=Optional.empty, 
dirs={/data/d1: MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, 
nodeId=101, directoryId=zm7fSw3zso9aR0AtuzsI_A), /data/metadata: 
MetaProperties(version=1, clusterId=RwO2UIkmTBWltwRllP05aA, nodeId=101, 
directoryId=eRO8vOP7ddbpx_W2ZazjLw), /data/d2: ERROR})
I/O error trying to read log directory /data/d2.
 {code}
When configured with multiple directories, Kafka tolerates some of them (but 
not all) being inaccessible, so this tool should be able to handle the same 
scenarios without crashing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520171958


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1796,12 +1834,12 @@ public void onLoaded() {
 consumerGroup.members().forEach((memberId, member) -> {
 log.debug("Loaded member {} in consumer group {}.", 
memberId, groupId);
 scheduleConsumerGroupSessionTimeout(groupId, memberId);
-if (member.state() == 
ConsumerGroupMember.MemberState.REVOKING) {
-scheduleConsumerGroupRevocationTimeout(
+if (member.state() == 
MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
 groupId,
-memberId,
-member.rebalanceTimeoutMs(),
-member.memberEpoch()
+member.memberId(),

Review Comment:
   did we change this from memberId to member.memberId just to be consistent 
with the other arguments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
  * @param groupId   The group id.
  * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param memberEpoch   The member epoch.
+ * @param rebalanceTimeoutMsThe rebalance timeout.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
 String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+int memberEpoch,
+int rebalanceTimeoutMs
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
+timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (member.memberEpoch() == memberEpoch) {
+log.info("[GroupId {}] Member {} fenced from the group 
because " +
+"it failed to transition from epoch {} within 
{}ms.",
+groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+return new 
CoordinatorResult<>(consumerGroupFenceMember(group, member));

Review Comment:
   So it seems like this is one of the main changes here -- we don't validate 
on the revoking state -- I guess we could be in a revoking state for the next 
assignment...
   
   I may have asked this on a previous pr, but are we assuming the member epoch 
of the member (not the one passed in) is always never less than the member 
epoch passed into this method. That makes sense given the epoch is 
monotonically increasing, but just wanted to confirm.
   
   As an aside, when we fence a group member, do we basically kick it out of 
the group and force it to rejoin? 
   Can the client rejoin without restarting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16362) Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825397#comment-17825397
 ] 

Greg Harris commented on KAFKA-16362:
-

cc [~mjsax] [~ableegoldman] I looked through the (currently ignored) rawtypes 
warnings in Streams and this was one that I really didn't have a simple 
resolution for, and I think needs a real refactor to make type-safe.

I don't think there's a bug hidden here, but the code didn't give me any 
confidence on that. I adjusted a test to prove to myself that the current 
implementation is correct, even if it is type-unsafe: 
[https://github.com/apache/kafka/pull/15513] 

Given the number of times that rawtypes are used in Streams internals, I wasn't 
sure if this was even a concern for you, and if you're interested in reducing 
the number of rawtypes used in streams over the long term.

> Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide
> 
>
> Key: KAFKA-16362
> URL: https://issues.apache.org/jira/browse/KAFKA-16362
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Priority: Trivial
>  Labels: newbie++
>
> The implementation of KStreamKStreamJoin has several places that the compiler 
> emits warnings for, that are later suppressed or ignored:
>  * LeftOrRightValue.make returns a raw LeftOrRightValue without generic 
> arguments, because the generic type arguments depend on the boolean input.
>  * Calling LeftOrRightValue includes an unchecked cast before inserting the 
> record into the outerJoinStore
>  * emitNonJoinedOuterRecords swaps the left and right values, and performs an 
> unchecked cast
> These seem to be closely related to the isLeftSide variable, which makes the 
> class behave differently whether it is present on the left or right side of a 
> join.
> We should figure out if these warnings can be eliminated by a refactor, 
> perhaps into KStreamKstreamJoin.Left and KStreamKStreamJoin.Right, or with 
> some generic arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
  * @param groupId   The group id.
  * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param memberEpoch   The member epoch.
+ * @param rebalanceTimeoutMsThe rebalance timeout.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
 String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+int memberEpoch,
+int rebalanceTimeoutMs
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
+timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (member.memberEpoch() == memberEpoch) {
+log.info("[GroupId {}] Member {} fenced from the group 
because " +
+"it failed to transition from epoch {} within 
{}ms.",
+groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+return new 
CoordinatorResult<>(consumerGroupFenceMember(group, member));

Review Comment:
   So it seems like this is one of the main changes here -- we don't validate 
on the revoking state.
   
   I may have asked this on a previous pr, but are we assuming the member epoch 
of the member (not the one passed in) is always never less than the member 
epoch passed into this method. That makes sense given the epoch is 
monotonically increasing, but just wanted to confirm.
   
   As an aside, when we fence a group member, do we basically kick it out of 
the group and force it to rejoin? 
   Can the client rejoin without restarting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types [kafka]

2024-03-11 Thread via GitHub


gharris1727 opened a new pull request, #15513:
URL: https://github.com/apache/kafka/pull/15513

   This test uses the same value types on the left and right, and so wouldn't 
be sensitive to a mixup between left and right values. So I changed one of the 
stream types to `Long`, and updated the assertions to match.
   
   Because the implementation of the KStreamKstreamOuterJoin is not type-safe, 
it was unclear just from the code that a mixup was not possible.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
  * @param groupId   The group id.
  * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param memberEpoch   The member epoch.
+ * @param rebalanceTimeoutMsThe rebalance timeout.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
 String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+int memberEpoch,
+int rebalanceTimeoutMs
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
+timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (member.memberEpoch() == memberEpoch) {
+log.info("[GroupId {}] Member {} fenced from the group 
because " +
+"it failed to transition from epoch {} within 
{}ms.",
+groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+return new 
CoordinatorResult<>(consumerGroupFenceMember(group, member));

Review Comment:
   So it seems like this is one of the main changes here. 
   When we fence a group member, do we basically kick it out of the group and 
force it to rejoin? 
   Can the client rejoin without restarting?
   
   I may have asked this on a previous pr, but are we assuming the member epoch 
of the member (not the one passed in) is always never less than the member 
epoch passed into this method. That makes sense given the epoch is 
monotonically increasing, but just wanted to confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Fix incorrect syntax for config [kafka]

2024-03-11 Thread via GitHub


mjsax commented on PR #15500:
URL: https://github.com/apache/kafka/pull/15500#issuecomment-1989056918

   Merged to `trunk` and cherry-picked to `3.7` and `3.6` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520152684


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1400,35 +1440,35 @@ private void cancelConsumerGroupSessionTimeout(
 }
 
 /**
- * Schedules a revocation timeout for the member.
+ * Schedules a rebalance timeout for the member.
  *
  * @param groupId   The group id.
  * @param memberId  The member id.
- * @param revocationTimeoutMs   The revocation timeout.
- * @param expectedMemberEpoch   The expected member epoch.
+ * @param memberEpoch   The member epoch.
+ * @param rebalanceTimeoutMsThe rebalance timeout.
  */
-private void scheduleConsumerGroupRevocationTimeout(
+private void scheduleConsumerGroupRebalanceTimeout(
 String groupId,
 String memberId,
-long revocationTimeoutMs,
-int expectedMemberEpoch
+int memberEpoch,
+int rebalanceTimeoutMs
 ) {
-String key = consumerGroupRevocationTimeoutKey(groupId, memberId);
-timer.schedule(key, revocationTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
+String key = consumerGroupRebalanceTimeoutKey(groupId, memberId);
+timer.schedule(key, rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, 
() -> {
 try {
 ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, 
false);
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 
-if (member.state() != ConsumerGroupMember.MemberState.REVOKING 
||
-member.memberEpoch() != expectedMemberEpoch) {
-log.debug("[GroupId {}] Ignoring revocation timeout for {} 
because the member " +
-"state does not match the expected state.", groupId, 
memberId);
+if (member.memberEpoch() == memberEpoch) {
+log.info("[GroupId {}] Member {} fenced from the group 
because " +
+"it failed to transition from epoch {} within 
{}ms.",
+groupId, memberId, memberEpoch, rebalanceTimeoutMs);
+return new 
CoordinatorResult<>(consumerGroupFenceMember(group, member));

Review Comment:
   So it seems like this is one of the main changes here. 
   When we fence a group member, do we basically kick it out of the group and 
force it to rejoin? 
   Can the client do this without restarting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16362) Fix type-unsafety in KStreamKStreamJoin caused by isLeftSide

2024-03-11 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16362:
---

 Summary: Fix type-unsafety in KStreamKStreamJoin caused by 
isLeftSide
 Key: KAFKA-16362
 URL: https://issues.apache.org/jira/browse/KAFKA-16362
 Project: Kafka
  Issue Type: Task
  Components: streams
Affects Versions: 3.7.0
Reporter: Greg Harris


The implementation of KStreamKStreamJoin has several places that the compiler 
emits warnings for, that are later suppressed or ignored:
 * LeftOrRightValue.make returns a raw LeftOrRightValue without generic 
arguments, because the generic type arguments depend on the boolean input.
 * Calling LeftOrRightValue includes an unchecked cast before inserting the 
record into the outerJoinStore
 * emitNonJoinedOuterRecords swaps the left and right values, and performs an 
unchecked cast

These seem to be closely related to the isLeftSide variable, which makes the 
class behave differently whether it is present on the left or right side of a 
join.

We should figure out if these warnings can be eliminated by a refactor, perhaps 
into KStreamKstreamJoin.Left and KStreamKStreamJoin.Right, or with some generic 
arguments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: update kraft_upgrade_test to create a new topic after metadata upgrade [kafka]

2024-03-11 Thread via GitHub


soarez commented on PR #15451:
URL: https://github.com/apache/kafka/pull/15451#issuecomment-1989015331

   @showuon could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520125834


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;

Review Comment:
   Did we follow up here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1520116151


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context,
 // from the target ISR, but we need to exclude it here too, to handle 
the case
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
+//
+// If the caller passed a valid broker ID for 
brokerWithUncleanShutdown, rather than
+// passing NO_LEADER, this node should not be an acceptable leader. We 
also exclude
+// brokerWithUncleanShutdown from ELR and ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)
+&& (r == brokerToAdd || clusterControl.isActive(r));

Review Comment:
   I disabled the handleUncleanShutdown if ELR is not enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115442


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
 }
 }
 
+private void updatePartitionInfo(
+Uuid topicId,
+Integer partitionId,
+PartitionRegistration prevPartInfo,
+PartitionRegistration newPartInfo
+) {
+HashSet validationSet = new HashSet<>();
+Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));
+if (validationSet.size() != newPartInfo.isr.length + 
newPartInfo.elr.length) {
+log.warn("{}-{} has overlapping ISR={} and ELR={}", 
topics.get(topicId).name, partitionId,
+Arrays.toString(newPartInfo.isr), partitionId, 
Arrays.toString(newPartInfo.elr));

Review Comment:
   Correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1520115750


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -336,10 +350,10 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
 if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on code in PR #15364:
URL: https://github.com/apache/kafka/pull/15364#discussion_r1520113443


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1211,13 +1192,99 @@ private 
CoordinatorResult consumerGr
 // 1. The member reported its owned partitions;
 // 2. The member just joined or rejoined to group (epoch equals to 
zero);
 // 3. The member's assignment has been updated.
-if (ownedTopicPartitions != null || memberEpoch == 0 || 
assignmentUpdated) {
+if (ownedTopicPartitions != null || memberEpoch == 0 || 
hasAssignedPartitionsChanged(member, updatedMember)) {
 response.setAssignment(createResponseAssignment(updatedMember));
 }
 
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Reconciles the current assignment of the member if needed.
+ *
+ * @param groupId   The group id.
+ * @param memberThe member to reconcile.
+ * @param currentPartitionEpoch The function returning the current epoch of
+ *  a given partition.
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment  The target assignment.
+ * @param ownedTopicPartitions  The list of partitions owned by the 
member. This
+ *  is reported in the ConsumerGroupHeartbeat 
API and
+ *  it could be null if not provided.
+ * @param records   The list to accumulate any new records.
+ * @return The received member if no changes have been made; or a new
+ * member containing the new assignment.
+ */
+private ConsumerGroupMember maybeReconcile(
+String groupId,
+ConsumerGroupMember member,
+BiFunction currentPartitionEpoch,
+int targetAssignmentEpoch,
+Assignment targetAssignment,
+List 
ownedTopicPartitions,
+List records
+) {
+if (member.isReconciledTo(targetAssignmentEpoch)) {
+return member;
+}
+
+ConsumerGroupMember updatedMember = new 
CurrentAssignmentBuilder(member)
+.withTargetAssignment(targetAssignmentEpoch, targetAssignment)
+.withCurrentPartitionEpoch(currentPartitionEpoch)
+.withOwnedTopicPartitions(ownedTopicPartitions)
+.build();
+
+if (!updatedMember.equals(member)) {
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+
+log.info("[GroupId {}] Member {} new assignment state: epoch={}, 
previousEpoch={}, state={}, "
+ + "assignedPartitions={} and revokedPartitions={}.",
+groupId, updatedMember.memberId(), 
updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), 
updatedMember.state(),
+formatAssignment(updatedMember.assignedPartitions()), 
formatAssignment(updatedMember.revokedPartitions()));
+
+if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
+scheduleConsumerGroupRebalanceTimeout(
+groupId,
+updatedMember.memberId(),
+updatedMember.memberEpoch(),
+updatedMember.rebalanceTimeoutMs()
+);
+} else {

Review Comment:
   Did we come to a resolution here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16249; Improve reconciliation state machine [kafka]

2024-03-11 Thread via GitHub


jolshan commented on PR #15364:
URL: https://github.com/apache/kafka/pull/15364#issuecomment-1988980430

   >  This is a non-backward compatible change. I think that we should do this 
change to cleanup the record. As KIP-848 is only in early access in 3.7 and 
that we clearly state that we don't plane to support upgrade from it, this is 
acceptable in my opinion.
   
   Are we gating this record change under anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy

2024-03-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16255:
--
Priority: Minor  (was: Major)

> AsyncKafkaConsumer should not use partition.assignment.strategy
> ---
>
> Key: KAFKA-16255
> URL: https://issues.apache.org/jira/browse/KAFKA-16255
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The {{partition.assignment.strategy}} configuration is used to specify a list 
> of zero or more {{ConsumerPartitionAssignor}} instances. However, that 
> interface is not applicable for the KIP-848-based protocol on top of which 
> {{AsyncKafkaConsumer}} is built. Therefore, the use of 
> {{ConsumerPartitionAssignor}} is inappropriate and should be removed from 
> {{{}AsyncKafkaConsumer{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15553) Review consumer positions update

2024-03-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15553:
--
Priority: Minor  (was: Major)

> Review consumer positions update
> 
>
> Key: KAFKA-15553
> URL: https://issues.apache.org/jira/browse/KAFKA-15553
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, position
> Fix For: 3.8.0
>
>
> From the existing comment: If there are any partitions which do not have a 
> valid position and are not awaiting reset, then we need to fetch committed 
> offsets.
> In the async consumer: I wonder if it would make sense to refresh the 
> position on the event loop continuously.
> The logic to refresh offsets in the poll loop is quite fragile and works 
> largely by side-effects of the code that it calls. For example, the behaviour 
> of the "cached" value is really not that straightforward and simply reading 
> the cached value is not sufficient to start consuming data in all cases.
> This area needs a bit of a refactor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16256) Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol

2024-03-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16256:
--
Priority: Minor  (was: Major)

> Update ConsumerConfig to validate use of group.remote.assignor and 
> partition.assignment.strategy based on group.protocol
> 
>
> Key: KAFKA-16256
> URL: https://issues.apache.org/jira/browse/KAFKA-16256
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> {{ConsumerConfig}} supports both the {{group.remote.assignor}} and 
> {{partition.assignment.strategy}} configuration options. These, however, 
> should not be used together; the former is applicable only when the 
> {{group.protocol}} is set to {{consumer}} and the latter when the 
> {{group.protocol}} is set to {{{}classic{}}}. We should emit a warning if the 
> user specifies the incorrect configuration based on the value of 
> {{{}group.protocol{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll

2024-03-11 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16298:
-

Assignee: Kirk True

> Ensure user callbacks exceptions are propagated to the user on consumer poll
> 
>
> Key: KAFKA-16298
> URL: https://issues.apache.org/jira/browse/KAFKA-16298
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Lianet Magrans
>Assignee: Kirk True
>Priority: Blocker
>  Labels: callback, kip-848-client-support
> Fix For: 3.8.0
>
>
> When user-defined callbacks fail with an exception, the expectation is that 
> the error should be propagated to the user as a KafkaExpception and break the 
> poll loop (behaviour in the legacy coordinator). The new consumer executes 
> callbacks in the application thread, and sends an event to the background 
> with the callback result and error if any, [passing the error along with the 
> event 
> here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1882]
>  to the background thread, but does not seem to propagate the exception to 
> the user. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16361) Rack aware sticky assignor minQuota violations

2024-03-11 Thread Luke D (Jira)
Luke D created KAFKA-16361:
--

 Summary: Rack aware sticky assignor minQuota violations
 Key: KAFKA-16361
 URL: https://issues.apache.org/jira/browse/KAFKA-16361
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.6.1, 3.7.0, 3.5.1
Reporter: Luke D


In some low topic replication scenarios the rack aware assignment in the 
StickyAssignor fails to balance consumers to its own expectations and throws an 
IllegalStateException, commonly crashing the application (depending on 
application implementation). While uncommon the error is deterministic, and so 
persists until the replication state changes. 

 

We have observed this in the wild in 3.5.1, and 3.6.1. We have reproduced it 
locally in a test case in 3.6.1 and 3.7.0 (3.5.1 we did not try but likely 
would also be reproducible there) 

 

Here is the error and stack from our test case against 3.7.0
{code:java}
We haven't reached the expected number of members with more than the minQuota 
partitions, but no more partitions to be assigned
java.lang.IllegalStateException: We haven't reached the expected number of 
members with more than the minQuota partitions, but no more partitions to be 
assigned
    at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.verifyUnfilledMembers(AbstractStickyAssignor.java:820)
    at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$ConstrainedAssignmentBuilder.build(AbstractStickyAssignor.java:652)
    at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assignPartitions(AbstractStickyAssignor.java:113)
    at 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.assign(AbstractPartitionAssignor.java:91)
 {code}
Here is a specific test case from 3.7.0 that fails when passed to 
StickyAssignor.assign:
{code:java}
Cluster(id = cluster-id, nodes = [host-3:1 (id: 4 rack: rack-3), host-3:1 (id: 
3 rack: rack-3), host-2:1 (id: 2 rack: rack-2), host-1:1 (id: 1 rack: rack-1)], 
partitions = [Partition(topic = topic_name, partition = 57, leader = 4, 
replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = topic_name, 
partition = 90, leader = 2, replicas = [2], isr = [2], offlineReplicas = []), 
Partition(topic = topic_name, partition = 28, leader = 3, replicas = [3], isr = 
[3], offlineReplicas = []), Partition(topic = topic_name, partition = 53, 
leader = 4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
topic_name, partition = 86, leader = 2, replicas = [2], isr = [2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 24, leader = 
4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = 
topic_name, partition = 49, leader = 1, replicas = [1,2], isr = [1,2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 82, leader = 
4, replicas = [4,2], isr = [4,2], offlineReplicas = []), Partition(topic = 
topic_name, partition = 20, leader = 2, replicas = [2,1], isr = [2,1], 
offlineReplicas = []), Partition(topic = topic_name, partition = 45, leader = 
2, replicas = [2], isr = [2], offlineReplicas = []), Partition(topic = 
topic_name, partition = 78, leader = 1, replicas = [1], isr = [1], 
offlineReplicas = []), Partition(topic = topic_name, partition = 16, leader = 
4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
topic_name, partition = 41, leader = 1, replicas = [1,2], isr = [1,2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 74, leader = 
4, replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = 
topic_name, partition = 12, leader = 2, replicas = [2], isr = [2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 37, leader = 
1, replicas = [1], isr = [1], offlineReplicas = []), Partition(topic = 
topic_name, partition = 70, leader = 2, replicas = [2], isr = [2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 8, leader = 4, 
replicas = [4,3,1], isr = [4,3,1], offlineReplicas = []), Partition(topic = 
topic_name, partition = 33, leader = 1, replicas = [1], isr = [1], 
offlineReplicas = []), Partition(topic = topic_name, partition = 66, leader = 
4, replicas = [4], isr = [4], offlineReplicas = []), Partition(topic = 
topic_name, partition = 4, leader = 2, replicas = [2], isr = [2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 29, leader = 
3, replicas = [3,1,2], isr = [3,1,2], offlineReplicas = []), Partition(topic = 
topic_name, partition = 62, leader = 3, replicas = [3,2,1], isr = [3,2,1], 
offlineReplicas = []), Partition(topic = topic_name, partition = 95, leader = 
4, replicas = [4,3,2], isr = [4,3,2], offlineReplicas = []), Partition(topic = 
topic_name, partition = 0, leader = 4, replicas = [4,1,2], isr = [4,1,2], 
offlineReplicas = []), Partition(topic = topic_name, partition = 25, leader 

[jira] [Commented] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825386#comment-17825386
 ] 

Justine Olshan commented on KAFKA-16360:


Hey there – there was some discussion on the mailing list. 3.8 should be the 
last release. See here: 
[https://lists.apache.org/thread/kvdp2gmq5gd9txkvxh5vk3z2n55b04s5] 
There is also a KIP. KIP-1012: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8.x+release]
Hope this clears things up. 3.8 should be the last release.

> Release plan of 3.x kafka releases.
> ---
>
> Key: KAFKA-16360
> URL: https://issues.apache.org/jira/browse/KAFKA-16360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Major
>
> KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
>  mentions ,
> h2. Kafka 3.7
>  * January 2024
>  * Final release with ZK mode
> But we see in Jira, some tickets are marked for 3.8 release. Does apache 
> continue to make 3.x releases having zookeeper and kraft supported 
> independent of pure kraft 4.x releases ?
> If yes, how many more releases can be expected on 3.x release line ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825385#comment-17825385
 ] 

Greg Harris commented on KAFKA-16360:
-

Hi [~kaushik srinivas], thanks for your question!

The release schedule in that KIP was superseded by a later KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1012%3A+The+need+for+a+Kafka+3.8.x+release]
 and so is not accurate any longer.

At this time, we expect that 3.8 will be the last 3.x release, but that could 
change depending on which features are ready in that release.

> Release plan of 3.x kafka releases.
> ---
>
> Key: KAFKA-16360
> URL: https://issues.apache.org/jira/browse/KAFKA-16360
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kaushik srinivas
>Priority: Major
>
> KIP 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
>  mentions ,
> h2. Kafka 3.7
>  * January 2024
>  * Final release with ZK mode
> But we see in Jira, some tickets are marked for 3.8 release. Does apache 
> continue to make 3.x releases having zookeeper and kraft supported 
> independent of pure kraft 4.x releases ?
> If yes, how many more releases can be expected on 3.x release line ?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16360) Release plan of 3.x kafka releases.

2024-03-11 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-16360:


 Summary: Release plan of 3.x kafka releases.
 Key: KAFKA-16360
 URL: https://issues.apache.org/jira/browse/KAFKA-16360
 Project: Kafka
  Issue Type: Improvement
Reporter: kaushik srinivas


KIP 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-833%3A+Mark+KRaft+as+Production+Ready#KIP833:MarkKRaftasProductionReady-ReleaseTimeline]
 mentions ,
h2. Kafka 3.7
 * January 2024
 * Final release with ZK mode

But we see in Jira, some tickets are marked for 3.8 release. Does apache 
continue to make 3.x releases having zookeeper and kraft supported independent 
of pure kraft 4.x releases ?

If yes, how many more releases can be expected on 3.x release line ?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-11 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825373#comment-17825373
 ] 

Gaurav Narula commented on KAFKA-16359:
---

[~apoorvmittal10] I stumbled upon 
[https://github.com/johnrengelman/shadow/issues/324] and figured it might be 
useful when you take this up.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16227: Avoid IllegalStateException during fetch initialization [kafka]

2024-03-11 Thread via GitHub


cadonna commented on code in PR #15491:
URL: https://github.com/apache/kafka/pull/15491#discussion_r1519991935


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -278,32 +278,48 @@ private CompletedFetch handleInitializeSuccess(final 
CompletedFetch completedFet
 }
 }
 
-if (partition.highWatermark() >= 0) {
-log.trace("Updating high watermark for partition {} to {}", tp, 
partition.highWatermark());
-subscriptions.updateHighWatermark(tp, partition.highWatermark());
+if (!updatePartitionState(partition, tp)) {
+return null;
+}
+
+completedFetch.setInitialized();
+return completedFetch;
+}
+
+private boolean updatePartitionState(final FetchResponseData.PartitionData 
partitionData,
+ final TopicPartition tp) {
+if (partitionData.highWatermark() >= 0) {
+log.trace("Updating high watermark for partition {} to {}", tp, 
partitionData.highWatermark());
+if (!subscriptions.tryUpdatingHighWatermark(tp, 
partitionData.highWatermark())) {

Review Comment:
   Yep!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16227: Avoid IllegalStateException during fetch initialization [kafka]

2024-03-11 Thread via GitHub


cadonna commented on PR #15491:
URL: https://github.com/apache/kafka/pull/15491#issuecomment-1988816400

   > LGTM! I think it's a good solution in the current architecture, though 
arguably a bit smelly.
   
   Agree on the arguable smell. The alternative would be to use locks which I 
disliked because locks are not needed for the legacy consumer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16354) FinalizedFeatureChangeListenerTest should use mocked latches

2024-03-11 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825365#comment-17825365
 ] 

PoAn Yang commented on KAFKA-16354:
---

Hi [~gharris1727], I am interested in this. May I assign to myself? Thank you.

> FinalizedFeatureChangeListenerTest should use mocked latches
> 
>
> Key: KAFKA-16354
> URL: https://issues.apache.org/jira/browse/KAFKA-16354
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Affects Versions: 3.7.0
>Reporter: Greg Harris
>Priority: Trivial
>  Labels: newbie
>
> testCacheUpdateWaitFailsForUnreachableVersion takes 30 seconds, and 
> testInitFailureDueToFeatureIncompatibility takes 5 seconds. This appears to 
> be caused by FeatureCacheUpdater#awaitUpdateOrThrow waiting for a real 
> CountDownLatch with a real-time timeout.
> Instead, a mocked latch should be used to exit the await immediately.
> This should be done both for CPU-independence, and for test execution speed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-12187: replace assertTrue(obj instanceof X) with assertInstanceOf [kafka]

2024-03-11 Thread via GitHub


brandboat opened a new pull request, #15512:
URL: https://github.com/apache/kafka/pull/15512

   as title, replace 
   - `assertTrue(obj instanceof X)` with `assertInstanceOf(X.class, obj)`
   - `assertTrue(obj instanceof X, errormessge)` with 
`assertInstanceOf(X.class, obj, errormessage)`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-11 Thread via GitHub


lucasbru commented on PR #15511:
URL: https://github.com/apache/kafka/pull/15511#issuecomment-1988762078

   @lianetm @dajac Could you please have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16114) Fix partiton not retention after cancel alter intra broker log dir task

2024-03-11 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825363#comment-17825363
 ] 

Divij Vaidya commented on KAFKA-16114:
--

Sorry [~albedooo] , I won't have bandwidth any time soon to look into this.

> Fix partiton not retention after cancel alter intra broker log dir task 
> 
>
> Key: KAFKA-16114
> URL: https://issues.apache.org/jira/browse/KAFKA-16114
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 3.3.2, 3.6.1
>Reporter: wangliucheng
>Priority: Major
>
> The deletion thread will not work on partition after cancel alter intra 
> broker log dir task 
> The steps to reproduce are as follows:
> 1、Create reassignment.json file
> test01-1 on the /data01/kafka/log01 directory of the broker 1003,then move to 
> /data01/kafka/log02 
> {code:java}
> {
>     "version": 1,    
>     "partitions": [
>         {
>             "topic": "test01",  
>             "partition": 1,
>             "replicas": [1001,1003],
>             "log_dirs": ["any","/data01/kafka/log02"]
>         }
>     ]
> }{code}
> 2、Kick off the reassignment
> {code:java}
> bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 
> --reassignment-json-file reassignment.json  -execute {code}
> 3、Cancel the reassignment
> {code:java}
> bin/kafka-reassign-partitions.sh -bootstrap-server localhost:9092 
> --reassignment-json-file reassignment.json  -cancel {code}
> 4、Result, The partition test01-1 on 1003 will not be deleted 
> The reason for this problem is the partition has been filtered:
> {code:java}
> val deletableLogs = logs.filter {
>   case (_, log) => !log.config.compact // pick non-compacted logs
> }.filterNot {
>   case (topicPartition, _) => inProgress.contains(topicPartition) // skip any 
> logs already in-progress
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16312, KAFKA-16185: Local epochs in reconciliation [kafka]

2024-03-11 Thread via GitHub


lucasbru opened a new pull request, #15511:
URL: https://github.com/apache/kafka/pull/15511

   The goal of this PR is to change the following internals of the 
reconciliation:
   
- Introduce a "local epoch" to the local target assignment. When a new 
target is received by the server, we compare it with the current value. If it 
is the same, no change. Otherwise, we bump the local epoch and store the new 
target assignment. Then, on the reconciliation, we also store the epoch in the 
reconciled assignment and keep using target != current to trigger the 
reconciliation. 
- When we are not in a group (we have not received an assignment), we use 
null to represent the local target assignment instead of an empty list, to 
avoid confusions with an empty assignment received by the server. Similarly, we 
use null to represent the current assignment, when we haven't reconciled the 
assignment yet.
- We also carry the new epoch into the request builder to ensure that we 
report the owned partitions for the last local epoch.
- To address 
[KAFKA-16312](https://issues.apache.org/jira/browse/KAFKA-16312) (call 
onPartitionsAssigned on empty assignments after joining), we apply the initial 
assignment returned by the group coordinator (whether empty or not) as a normal 
reconciliation. This avoids introducing another code path to trigger rebalance 
listeners - reconciliation is the only way to transition to STABLE. The 
unneeded parts of reconciliation (autocommit, revocation) will be skipped in 
the existing. Since a lot of unit tests assumed that not reconciliation 
behavior is invoked when joining the group with an empty assignment, this 
required a lot of the changes in the unit tests.
   
   ## Testing
   
   These changes allow the new consumer to pass the first 10 system tests. We 
piggy-back a minor change to the `HeartbeatManager` that are required for those 
system tests as well: always send `rebalanceTimoutMs`, `subscriptions` when 
(re-)joining.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16227: Avoid IllegalStateException during fetch initialization [kafka]

2024-03-11 Thread via GitHub


lucasbru commented on code in PR #15491:
URL: https://github.com/apache/kafka/pull/15491#discussion_r1519911603


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##
@@ -278,32 +278,48 @@ private CompletedFetch handleInitializeSuccess(final 
CompletedFetch completedFet
 }
 }
 
-if (partition.highWatermark() >= 0) {
-log.trace("Updating high watermark for partition {} to {}", tp, 
partition.highWatermark());
-subscriptions.updateHighWatermark(tp, partition.highWatermark());
+if (!updatePartitionState(partition, tp)) {
+return null;
+}
+
+completedFetch.setInitialized();
+return completedFetch;
+}
+
+private boolean updatePartitionState(final FetchResponseData.PartitionData 
partitionData,
+ final TopicPartition tp) {
+if (partitionData.highWatermark() >= 0) {
+log.trace("Updating high watermark for partition {} to {}", tp, 
partitionData.highWatermark());
+if (!subscriptions.tryUpdatingHighWatermark(tp, 
partitionData.highWatermark())) {

Review Comment:
   So we are introducing all these "try" things to avoid races, I suppose?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]

2024-03-11 Thread via GitHub


mumrah commented on code in PR #14706:
URL: https://github.com/apache/kafka/pull/14706#discussion_r1519830343


##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
 }
 }
 
+private void updatePartitionInfo(
+Uuid topicId,
+Integer partitionId,
+PartitionRegistration prevPartInfo,
+PartitionRegistration newPartInfo
+) {
+HashSet validationSet = new HashSet<>();
+Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));
+if (validationSet.size() != newPartInfo.isr.length + 
newPartInfo.elr.length) {
+log.warn("{}-{} has overlapping ISR={} and ELR={}", 
topics.get(topicId).name, partitionId,
+Arrays.toString(newPartInfo.isr), partitionId, 
Arrays.toString(newPartInfo.elr));

Review Comment:
   This can only happen if we have a bug where the ELR and ISR are allowed to 
overlap right? 
   
   Since this is part of the `replay`, we shouldn't throw here (since the 
record has already been committed), but perhaps an ERROR is better than a WARN.



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -336,10 +350,10 @@ public ControllerResult 
registerBroker(
 ", but got cluster ID " + request.clusterId());
 }
 int brokerId = request.brokerId();
+List records = new ArrayList<>();
 BrokerRegistration existing = brokerRegistrations.get(brokerId);
 if (version < 2 || existing == null || request.previousBrokerEpoch() 
!= existing.epoch()) {
-// TODO(KIP-966): Update the ELR if the broker has an unclean 
shutdown.
-log.debug("Received an unclean shutdown request");

Review Comment:
   I think we inadvertently lost this log message.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -2240,6 +2283,25 @@ private void updatePartitionDirectories(
 }
 }
 
+private void updatePartitionInfo(
+Uuid topicId,
+Integer partitionId,
+PartitionRegistration prevPartInfo,
+PartitionRegistration newPartInfo
+) {
+HashSet validationSet = new HashSet<>();
+Arrays.stream(newPartInfo.isr).forEach(ii -> validationSet.add(ii));
+Arrays.stream(newPartInfo.elr).forEach(ii -> validationSet.add(ii));

Review Comment:
   nit: i think you can do `forEach(validationSet::add)` here



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1815,12 +1845,17 @@ void validateManualPartitionAssignment(
  *  broker to remove from the ISR and leadership, 
otherwise.
  * @param brokerToAdd   NO_LEADER if no broker is being added; the ID 
of the
  *  broker which is now eligible to be a leader, 
otherwise.
+ * @param brokerWithUncleanShutdown

Review Comment:
   nit: update the main description of this method to mention ISR and ELR



##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -780,4 +793,9 @@ public Entry> next() {
 }
 };
 }
+
+@FunctionalInterface
+interface BrokerUncleanShutdownHandler {
+void apply(int brokerId, List records);

Review Comment:
   nit: since we're defining an interface, we can use a more descriptive name 
than "apply" for the method. Maybe "addRecordsForShutdown" or something.



##
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##
@@ -1837,8 +1872,13 @@ void generateLeaderAndIsrUpdates(String context,
 // from the target ISR, but we need to exclude it here too, to handle 
the case
 // where there is an unclean leader election which chooses a leader 
from outside
 // the ISR.
+//
+// If the caller passed a valid broker ID for 
brokerWithUncleanShutdown, rather than
+// passing NO_LEADER, this node should not be an acceptable leader. We 
also exclude
+// brokerWithUncleanShutdown from ELR and ISR.
 IntPredicate isAcceptableLeader =
-r -> (r != brokerToRemove) && (r == brokerToAdd || 
clusterControl.isActive(r));
+r -> (r != brokerToRemove && r != brokerWithUncleanShutdown)
+&& (r == brokerToAdd || clusterControl.isActive(r));

Review Comment:
   Since our guards around ELR (if it's enabled or not) are in 
PartitionChangeBuilder, we need to make sure this logic is correct when ELR is 
not enabled due to MV. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: 

[jira] [Assigned] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-11 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal reassigned KAFKA-16359:
-

Assignee: Apoorv Mittal

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16359) kafka-clients-3.7.0.jar published to Maven Central is defective

2024-03-11 Thread Apoorv Mittal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825333#comment-17825333
 ] 

Apoorv Mittal commented on KAFKA-16359:
---

Hi [~norrisjeremy] , thanks for reporting the issue. As [~gnarula] mentioned 
this comes as part of shadow plugin, I will look into the issue and should come 
back with right resolution.

> kafka-clients-3.7.0.jar published to Maven Central is defective
> ---
>
> Key: KAFKA-16359
> URL: https://issues.apache.org/jira/browse/KAFKA-16359
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.7.0
>Reporter: Jeremy Norris
>Assignee: Apoorv Mittal
>Priority: Critical
>
> The {{kafka-clients-3.7.0.jar}} that has been published to Maven Central is 
> defective: it's {{META-INF/MANIFEST.MF}} bogusly include a {{Class-Path}} 
> element:
> {code}
> Manifest-Version: 1.0
> Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
> .5.jar slf4j-api-1.7.36.jar
> {code}
> This bogus {{Class-Path}} element leads to compiler warnings for projects 
> that utilize it as a dependency:
> {code}
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar": 
> no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
>  no such file or directory
> [WARNING] [path] bad path element 
> ".../.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
>  no such file or directory
> {code}
> Either the {{kafka-clients-3.7.0.jar}} needs to be respun and published 
> without the bogus {{Class-Path}} element in it's {{META-INF/MANIFEST.MF}} or 
> a new release should be published that corrects this defect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >